You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/11/28 11:32:01 UTC
svn commit: r721415 - in /hadoop/core/trunk: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Author: yhemanth
Date: Fri Nov 28 02:32:01 2008
New Revision: 721415
URL: http://svn.apache.org/viewvc?rev=721415&view=rev
Log:
HADOOP-4513. Initialize jobs asynchronously in the capacity scheduler. Contributed by Sreekanth Ramakrishnan.
Added:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/capacity-scheduler.xml.template
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Nov 28 02:32:01 2008
@@ -144,6 +144,9 @@
HADOOP-2774. Add counters tracking records spilled to disk in MapTask and
ReduceTask. (Ravi Gummadi via cdouglas)
+ HADOOP-4513. Initialize jobs asynchronously in the capacity scheduler.
+ (Sreekanth Ramakrishnan via yhemanth)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Fri Nov 28 02:32:01 2008
@@ -46,6 +46,13 @@
value of 100 implies no user limits are imposed.
</description>
</property>
+ <property>
+ <name>mapred.capacity-scheduler.queue.default.maximum-initialized-jobs-per-user</name>
+ <value>2</value>
+ <description>The maximum number of jobs to be pre-initialized for a user
+ of the job queue.
+ </description>
+ </property>
<!-- The default configuration settings for the capacity task scheduler -->
<!-- The default values would be applied to all the queues which don't have -->
@@ -74,4 +81,35 @@
for the job queue at any given point of time by default.
</description>
</property>
+
+ <property>
+ <name>mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user</name>
+ <value>2</value>
+ <description>The maximum number of jobs to be pre-initialized for a user
+ of the job queue.
+ </description>
+ </property>
+
+
+ <!-- Capacity scheduler Job Initialization configuration parameters -->
+ <property>
+ <name>mapred.capacity-scheduler.init-poll-interval</name>
+ <value>5000</value>
+ <description>The amount of time in miliseconds which is used to poll
+ the job queues for jobs to initialize.
+ </description>
+ </property>
+ <property>
+ <name>mapred.capacity-scheduler.init-worker-threads</name>
+ <value>5</value>
+ <description>Number of worker threads which would be used by
+ Initialization poller to initialize jobs in a set of queue.
+ If number mentioned in property is equal to number of job queues
+ then a single thread would initialize jobs in a queue. If lesser
+ then a thread would get a set of queues assigned. If the number
+ is greater then number of threads would be equal to number of
+ job queues.
+ </description>
+ </property>
+
</configuration>
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Nov 28 02:32:01 2008
@@ -17,8 +17,6 @@
package org.apache.hadoop.mapred;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -46,6 +44,8 @@
"mapred.capacity-scheduler.queue.";
private Configuration rmConf;
+
+ private int defaultMaxJobsPerUsersToInitialize;
/**
* Create a new ResourceManagerConf.
@@ -83,6 +83,9 @@
"mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
defaultSupportPriority = rmConf.getBoolean(
"mapred.capacity-scheduler.default-supports-priority", false);
+ defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
+ "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
+ 2);
}
/**
@@ -247,5 +250,102 @@
String property) {
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
}
+
+ /**
+ * Gets the maximum number of jobs which are allowed to initialize in the
+ * job queue.
+ *
+ * @param queue queue name.
+ * @return maximum number of jobs allowed to be initialized per user.
+ * @throws IllegalArgumentException if maximum number of users is negative
+ * or zero.
+ */
+ public int getMaxJobsPerUserToInitialize(String queue) {
+ int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
+ "maximum-initialized-jobs-per-user"),
+ defaultMaxJobsPerUsersToInitialize);
+ if(maxJobsPerUser <= 0) {
+ throw new IllegalArgumentException(
+ "Invalid maximum jobs per user configuration " + maxJobsPerUser);
+ }
+ return maxJobsPerUser;
+ }
+ /**
+ * Sets the maximum number of jobs which are allowed to be initialized
+ * for a user in the queue.
+ *
+ * @param queue queue name.
+ * @param value maximum number of jobs allowed to be initialized per user.
+ */
+ public void setMaxJobsPerUserToInitialize(String queue, int value) {
+ rmConf.setInt(toFullPropertyName(queue,
+ "maximum-initialized-jobs-per-user"), value);
+ }
+
+ /**
+ * Amount of time in miliseconds which poller thread and initialization
+ * thread would sleep before looking at the queued jobs.
+ *
+ * @return time in miliseconds.
+ * @throws IllegalArgumentException if time is negative or zero.
+ */
+ public long getSleepInterval() {
+ long sleepInterval = rmConf.getLong(
+ "mapred.capacity-scheduler.init-poll-interval", -1);
+
+ if(sleepInterval <= 0) {
+ throw new IllegalArgumentException(
+ "Invalid initializater poller interval " + sleepInterval);
+ }
+
+ return sleepInterval;
+ }
+
+ /**
+ * Gets maximum number of threads which are spawned to initialize jobs
+ * in job queue in parallel. The number of threads should be always less than
+ * or equal to number of job queues present.
+ *
+ * If number of threads is configured to be more than job queues present,
+ * then number of job queues is used as number of threads used for initializing
+ * jobs.
+ *
+ * So a given thread can have responsibility of initializing jobs from more
+ * than one queue.
+ *
+ * @return maximum number of threads spawned to initialize jobs in job queue
+ * in parallel.
+ */
+ public int getMaxWorkerThreads() {
+ int maxWorkerThreads = rmConf.getInt(
+ "mapred.capacity-scheduler.init-worker-threads", 0);
+ if(maxWorkerThreads <= 0) {
+ throw new IllegalArgumentException(
+ "Invalid initializater worker thread number " + maxWorkerThreads);
+ }
+ return maxWorkerThreads;
+ }
+ /**
+ * Set the sleep interval which initialization poller would sleep before
+ * it looks at the jobs in the job queue.
+ *
+ * @param interval sleep interval
+ */
+ public void setSleepInterval(long interval) {
+ rmConf.setLong(
+ "mapred.capacity-scheduler.init-poll-interval", interval);
+ }
+
+ /**
+ * Sets number of threads which can be spawned to initialize jobs in
+ * parallel.
+ *
+ * @param poolSize number of threads to be spawned to initialize jobs
+ * in parallel.
+ */
+ public void setMaxWorkerThreads(int poolSize) {
+ rmConf.setInt(
+ "mapred.capacity-scheduler.init-worker-threads", poolSize);
+ }
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Nov 28 02:32:01 2008
@@ -534,16 +534,7 @@
* consider the first few jobs per user.
*/
}
- // update stats on waiting jobs
- for (JobInProgress j:
- scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
- // pending tasks
- if (qsi.numPendingTasks > getClusterCapacity()) {
- // that's plenty. no need for more computation
- break;
- }
- qsi.numPendingTasks += getPendingTasks(j);
- }
+ //TODO do we need to update stats on waiting jobs
}
}
@@ -665,26 +656,7 @@
}
}
- // if we're here, we found nothing in the running jobs. Time to
- // look at waiting jobs. Get first job of a user that is not over limit
- for (JobInProgress j:
- scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
- // is this job's user over limit?
- if (usersOverLimit.contains(j.getProfile().getUser())) {
- // user over limit.
- continue;
- }
- // this job is a candidate for running. Initialize it, move it
- // to run queue
- j.initTasks();
- // We found a suitable job. Get task from it.
- t = obtainNewTask(taskTracker, j);
- if (t != null) {
- LOG.debug("Getting task from job " +
- j.getJobID() + " in queue " + qsi.queueName);
- return t;
- }
- }
+
// if we're here, we haven't found anything. This could be because
// there is nothing to run, or that the user limit for some user is
@@ -705,19 +677,6 @@
}
}
}
- // look at waiting jobs the same way
- for (JobInProgress j:
- scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
- if (usersOverLimit.contains(j.getProfile().getUser())) {
- j.initTasks();
- t = obtainNewTask(taskTracker, j);
- if (t != null) {
- LOG.debug("Getting task from job " +
- j.getJobID() + " in queue " + qsi.queueName);
- return t;
- }
- }
- }
}
return null;
@@ -766,12 +725,10 @@
for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
Collection<JobInProgress> runJobs =
scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
- Collection<JobInProgress> waitJobs =
- scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName);
s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" +
qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity +
", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() +
- ", wait jobs=" + waitJobs.size() + "*** ");
+ "*** ");
}
LOG.debug(s);
}
@@ -965,8 +922,9 @@
}
}
private Clock clock;
+ private JobInitializationPoller initializationPoller;
+
-
public CapacityTaskScheduler() {
this(new Clock());
}
@@ -1053,6 +1011,14 @@
// listen to job changes
taskTrackerManager.addJobInProgressListener(jobQueuesManager);
+ //Start thread for initialization
+ if (initializationPoller == null) {
+ this.initializationPoller = new JobInitializationPoller(
+ jobQueuesManager,rmConf,queues);
+ }
+ initializationPoller.init(queueManager.getQueues(), rmConf);
+ initializationPoller.setDaemon(true);
+ initializationPoller.start();
// start thread for redistributing capacity
this.reclaimCapacityThread =
new Thread(new ReclaimCapacity(),"reclaimCapacity");
@@ -1061,6 +1027,11 @@
LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
}
+ /** mostly for testing purposes */
+ void setInitializationPoller(JobInitializationPoller p) {
+ this.initializationPoller = p;
+ }
+
@Override
public synchronized void terminate() throws IOException {
if (!started) return;
@@ -1071,6 +1042,7 @@
// tell the reclaim thread to stop
stopReclaim = true;
started = false;
+ initializationPoller.terminate();
super.terminate();
}
@@ -1172,13 +1144,20 @@
jobCollection.addAll(runningJobs);
}
Collection<JobInProgress> waitingJobs =
- jobQueuesManager.getWaitingJobQueue(queueName);
+ jobQueuesManager.getJobs(queueName);
+ Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
if(waitingJobs != null) {
- jobCollection.addAll(waitingJobs);
+ tempCollection.addAll(waitingJobs);
+ }
+ tempCollection.removeAll(runningJobs);
+ if(!tempCollection.isEmpty()) {
+ jobCollection.addAll(tempCollection);
}
return jobCollection;
}
-
+ JobInitializationPoller getInitializationPoller() {
+ return initializationPoller;
+ }
}
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=721415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Fri Nov 28 02:32:01 2008
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class asynchronously initializes jobs submitted to the
+ * Map/Reduce cluster running with the {@link CapacityTaskScheduler}.
+ *
+ * <p>
+ * The class comprises of a main poller thread, and a set of worker
+ * threads that together initialize the jobs. The poller thread periodically
+ * looks at jobs submitted to the scheduler, and selects a set of them
+ * to be initialized. It passes these to the worker threads for initializing.
+ * Each worker thread is configured to look at jobs submitted to a fixed
+ * set of queues. It initializes jobs in a round robin manner - selecting
+ * the first job in order from each queue ready to be initialized.
+ * </p>
+ *
+ * <p>
+ * An initialized job occupies memory resources on the Job Tracker. Hence,
+ * the poller limits the number of jobs initialized at any given time to
+ * a configured limit. The limit is specified per user per queue.
+ * </p>
+ *
+ * <p>
+ * However, since a job needs to be initialized before the scheduler can
+ * select tasks from it to run, it tries to keep a backlog of jobs
+ * initialized so the scheduler does not need to wait and let empty slots
+ * go waste. The core logic of the poller is to pick up the right jobs,
+ * which have a good potential to be run next by the scheduler. To do this,
+ * it picks up jobs submitted across users and across queues to account
+ * both for guaranteed capacities and user limits. It also always initializes
+ * high priority jobs, whenever they need to be initialized, even if this
+ * means going over the limit for initialized jobs.
+ * </p>
+ */
+public class JobInitializationPoller extends Thread {
+
+ private static final Log LOG = LogFactory
+ .getLog(JobInitializationPoller.class.getName());
+
+ /*
+ * The poller picks up jobs across users to initialize based on user limits.
+ * Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
+ * can run together. However, in order to account for jobs from a user that
+ * might complete faster than others, it initializes jobs from an additional
+ * number of users as a backlog. This variable defines the additional
+ * number of users whose jobs can be considered for initializing.
+ */
+ private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
+
+ private JobQueuesManager jobQueueManager;
+ private long sleepInterval;
+ private int poolSize;
+
+ /**
+ * A worker thread that initializes jobs in one or more queues assigned to
+ * it.
+ *
+ * Jobs are initialized in a round robin fashion one from each queue at a
+ * time.
+ */
+ class JobInitializationThread extends Thread {
+
+ private JobInProgress initializingJob;
+
+ private volatile boolean startIniting;
+ private AtomicInteger currentJobCount = new AtomicInteger(0); // number of jobs to initialize
+
+ /**
+ * The hash map which maintains relationship between queue to jobs to
+ * initialize per queue.
+ */
+ private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
+
+ public JobInitializationThread() {
+ startIniting = true;
+ jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
+ }
+
+ @Override
+ public void run() {
+ while (startIniting) {
+ initializeJobs();
+ try {
+ if (startIniting) {
+ Thread.sleep(sleepInterval);
+ } else {
+ break;
+ }
+ } catch (Throwable t) {
+ }
+ }
+ }
+
+ // The key method that initializes jobs from queues
+ // This method is package-private to allow test cases to call it
+ // synchronously in a controlled manner.
+ void initializeJobs() {
+ // while there are more jobs to initialize...
+ while (currentJobCount.get() > 0) {
+ Set<String> queues = jobsPerQueue.keySet();
+ for (String queue : queues) {
+ JobInProgress job = getFirstJobInQueue(queue);
+ if (job == null) {
+ continue;
+ }
+ LOG.info("Initializing job : " + job.getJobID() + " in Queue "
+ + job.getProfile().getQueueName() + " For user : "
+ + job.getProfile().getUser());
+ try {
+ if (startIniting) {
+ setInitializingJob(job);
+ job.initTasks();
+ setInitializingJob(null);
+ } else {
+ break;
+ }
+ } catch (Throwable t) {
+ LOG.info("Job initialization failed:\n"
+ + StringUtils.stringifyException(t));
+ if (job != null)
+ job.fail();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method returns the first job in the queue and removes the same.
+ *
+ * @param queue
+ * queue name
+ * @return First job in the queue and removes it.
+ */
+ private JobInProgress getFirstJobInQueue(String queue) {
+ TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
+ .get(queue);
+ synchronized (jobsList) {
+ if (jobsList.isEmpty()) {
+ return null;
+ }
+ Iterator<JobInProgress> jobIterator = jobsList.values().iterator();
+ JobInProgress job = jobIterator.next();
+ jobIterator.remove();
+ currentJobCount.getAndDecrement();
+ return job;
+ }
+ }
+
+ /*
+ * Test method to check if the thread is currently initialising the job
+ */
+ synchronized JobInProgress getInitializingJob() {
+ return this.initializingJob;
+ }
+
+ synchronized void setInitializingJob(JobInProgress job) {
+ this.initializingJob = job;
+ }
+
+ void terminate() {
+ startIniting = false;
+ }
+
+ void addJobsToQueue(String queue, JobInProgress job) {
+ TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
+ .get(queue);
+ if (jobs == null) {
+ LOG.error("Invalid queue passed to the thread : " + queue
+ + " For job :: " + job.getJobID());
+ }
+ synchronized (jobs) {
+ JobSchedulingInfo schedInfo = new JobSchedulingInfo(job);
+ jobs.put(schedInfo, job);
+ currentJobCount.getAndIncrement();
+ }
+ }
+
+ void addQueue(String queue) {
+ TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
+ jobQueueManager.getComparator(queue));
+ jobsPerQueue.put(queue, jobs);
+ }
+ }
+
+ /**
+ * The queue information class maintains following information per queue:
+ * Maximum users allowed to initialize job in the particular queue. Maximum
+ * jobs allowed to be initialize per user in the queue.
+ *
+ */
+ private class QueueInfo {
+ String queue;
+ int maxUsersAllowedToInitialize;
+ int maxJobsPerUserToInitialize;
+
+ public QueueInfo(String queue, int maxUsersAllowedToInitialize,
+ int maxJobsPerUserToInitialize) {
+ this.queue = queue;
+ this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
+ this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
+ }
+ }
+
+ /**
+ * Map which contains the configuration used for initializing jobs
+ * in that associated to a particular job queue.
+ */
+ private HashMap<String, QueueInfo> jobQueues;
+
+ /**
+ * Set of jobs which have been passed to Initialization threads.
+ * This is maintained so that we dont call initTasks() for same job twice.
+ */
+ private HashSet<JobID> initializedJobs;
+
+ private volatile boolean running;
+
+ /**
+ * The map which provides information which thread should be used to
+ * initialize jobs for a given job queue.
+ */
+ private HashMap<String, JobInitializationThread> threadsToQueueMap;
+
+ public JobInitializationPoller(JobQueuesManager mgr,
+ CapacitySchedulerConf rmConf, Set<String> queue) {
+ initializedJobs = new HashSet<JobID>();
+ jobQueues = new HashMap<String, QueueInfo>();
+ this.jobQueueManager = mgr;
+ threadsToQueueMap = new HashMap<String, JobInitializationThread>();
+ super.setName("JobInitializationPollerThread");
+ running = true;
+ }
+
+ /*
+ * method to read all configuration values required by the initialisation
+ * poller
+ */
+
+ void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
+ for (String queue : queues) {
+ int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
+ int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
+ int maxJobsPerUserToInitialize = capacityConf
+ .getMaxJobsPerUserToInitialize(queue);
+ QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
+ maxJobsPerUserToInitialize);
+ jobQueues.put(queue, qi);
+ }
+ sleepInterval = capacityConf.getSleepInterval();
+ poolSize = capacityConf.getMaxWorkerThreads();
+ if (poolSize > queues.size()) {
+ poolSize = queues.size();
+ }
+ assignThreadsToQueues();
+ Collection<JobInitializationThread> threads = threadsToQueueMap.values();
+ for (JobInitializationThread t : threads) {
+ if (!t.isAlive()) {
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+ }
+
+ public void run() {
+ while (running) {
+ try {
+ selectJobsToInitialize();
+ if (!this.isInterrupted()) {
+ Thread.sleep(sleepInterval);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Job Initialization poller interrupted"
+ + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ // The key method that picks up jobs to initialize for each queue.
+ // The jobs picked up are added to the worker thread that is handling
+ // initialization for that queue.
+ // The method is package private to allow tests to call it synchronously
+ // in a controlled manner.
+ void selectJobsToInitialize() {
+ for (String queue : jobQueues.keySet()) {
+ ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
+ //if (LOG.isDebugEnabled()) {
+ printJobs(jobsToInitialize);
+ //}
+ JobInitializationThread t = threadsToQueueMap.get(queue);
+ for (JobInProgress job : jobsToInitialize) {
+ t.addJobsToQueue(queue, job);
+ }
+ }
+ }
+
+ private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
+ for (JobInProgress job : jobsToInitialize) {
+ LOG.info("Passing to Initializer Job Id :" + job.getJobID()
+ + " User: " + job.getProfile().getUser() + " Queue : "
+ + job.getProfile().getQueueName());
+ }
+ }
+
+ // This method exists to be overridden by test cases that wish to
+ // create a test-friendly worker thread which can be controlled
+ // synchronously.
+ JobInitializationThread createJobInitializationThread() {
+ return new JobInitializationThread();
+ }
+
+ private void assignThreadsToQueues() {
+ int countOfQueues = jobQueues.size();
+ String[] queues = (String[]) jobQueues.keySet().toArray(
+ new String[countOfQueues]);
+ int numberOfQueuesPerThread = countOfQueues / poolSize;
+ int numberOfQueuesAssigned = 0;
+ for (int i = 0; i < poolSize; i++) {
+ JobInitializationThread initializer = createJobInitializationThread();
+ int batch = (i * numberOfQueuesPerThread);
+ for (int j = batch; j < (batch + numberOfQueuesPerThread); j++) {
+ initializer.addQueue(queues[j]);
+ threadsToQueueMap.put(queues[j], initializer);
+ numberOfQueuesAssigned++;
+ }
+ }
+
+ if (numberOfQueuesAssigned < countOfQueues) {
+ // Assign remaining queues in round robin fashion to other queues
+ int startIndex = 0;
+ for (int i = numberOfQueuesAssigned; i < countOfQueues; i++) {
+ JobInitializationThread t = threadsToQueueMap
+ .get(queues[startIndex]);
+ t.addQueue(queues[i]);
+ threadsToQueueMap.put(queues[i], t);
+ startIndex++;
+ }
+ }
+ }
+
+ /*
+ * Select jobs to be initialized for a given queue.
+ *
+ * The jobs are selected such that they are within the limits
+ * for number of users and number of jobs per user in the queue.
+ * The only exception is if high priority jobs are waiting to be
+ * initialized. In that case, we could exceed the configured limits.
+ * However, we try to restrict the excess to a minimum.
+ */
+ ArrayList<JobInProgress> getJobsToInitialize(String queue) {
+ QueueInfo qi = jobQueues.get(queue);
+ ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
+ // use the configuration parameter which is configured for the particular
+ // queue.
+ int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
+ int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
+ // calculate maximum number of jobs which can be allowed to initialize
+ // for this queue.
+ // This value is used when a user submits a high priority job after we
+ // have initialized jobs for that queue and none of them is scheduled.
+ // This would prevent us from initializing extra jobs for that particular
+ // user. Explanation given at end of method.
+ int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
+ * maxJobsPerUserAllowedToInitialize;
+ Collection<JobInProgress> jobs = jobQueueManager.getJobs(queue);
+ int countOfJobsInitialized = 0;
+ HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
+ for (JobInProgress job : jobs) {
+ /*
+ * First check if job has been scheduled or completed or killed. If so
+ * then remove from uninitialised jobs. Remove from Job queue
+ */
+ if ((job.getStatus().getRunState() == JobStatus.RUNNING)
+ && (job.runningMaps() > 0 || job.runningReduces() > 0
+ || job.finishedMaps() > 0 || job.finishedReduces() > 0)) {
+ LOG.debug("Removing from the queue " + job.getJobID());
+ initializedJobs.remove(job.getJobID());
+ jobQueueManager.removeJobFromQueue(job);
+ continue;
+ } else if (job.isComplete()) {
+ LOG.debug("Removing from completed job from " + "the queue "
+ + job.getJobID());
+ initializedJobs.remove(job.getJobID());
+ jobQueueManager.removeJobFromQueue(job);
+ continue;
+ }
+ String user = job.getProfile().getUser();
+ int numberOfJobs = userJobsInitialized.get(user) == null ? 0
+ : userJobsInitialized.get(user);
+ // If the job is already initialized then add the count against user
+ // then continue.
+ if (initializedJobs.contains(job.getJobID())) {
+ userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
+ countOfJobsInitialized++;
+ continue;
+ }
+ boolean isUserPresent = userJobsInitialized.containsKey(user);
+ /*
+ * If the user is present in user list and size of user list is less
+ * maximum allowed users initialize then initialize this job and add this
+ * user to the global list.
+ *
+ * Else if he is present we check if his number of jobs has not crossed
+ * his quota and global quota.
+ *
+ * The logic behind using a global per queue job can be understood by example
+ * below: Consider 3 users submitting normal priority job in a job queue with
+ * user limit as 100. (Max jobs per user = 2)
+ *
+ * U1J1,U1J2,U1J3....,U3J3.
+ *
+ * Jobs initialized would be
+ *
+ * U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
+ *
+ * Now consider a case where U4 comes in and submits a high priority job.
+ *
+ * U4J1 --- High Priority JOb, U4J2---- Normal priority job.
+ *
+ * So, if we dont use global per queue value we would end up initializing both
+ * U4 jobs which is not correct.
+ *
+ * By using a global value we ensure that we dont initialize any extra jobs
+ * for a user.
+ */
+ if (!isUserPresent
+ && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
+ // this is a new user being considered and the number of users
+ // is within limits.
+ userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
+ jobsToInitialize.add(job);
+ initializedJobs.add(job.getJobID());
+ countOfJobsInitialized++;
+ } else if (isUserPresent
+ && numberOfJobs < maxJobsPerUserAllowedToInitialize
+ && countOfJobsInitialized < maxJobsPerQueueToInitialize) {
+ /*
+ * this is an existing user and the number of jobs per user
+ * is within limits, as also the number of jobs per queue.
+ * We need the check on number of jobs per queue to restrict
+ * the number of jobs we initialize over the limit due to high
+ * priority jobs.
+ *
+ * For e.g Consider 3 users submitting normal priority job in
+ * a job queue with user limit as 100 and max jobs per user as 2
+ * Say the jobs are U1J1,U1J2,U1J3....,U3J3.
+ *
+ * Jobs initialized would be U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
+ *
+ * Now consider a case where U4 comes in and submits a high priority job
+ * and a normal priority job. Say U4J1 and U4J2
+ *
+ * If we dont consider the number of jobs per queue we would end up
+ * initializing both jobs from U4. Initializing the second job is
+ * unnecessary.
+ */
+ userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
+ jobsToInitialize.add(job);
+ initializedJobs.add(job.getJobID());
+ countOfJobsInitialized++;
+ }
+ }
+ return jobsToInitialize;
+ }
+
+
+ void terminate() {
+ running = false;
+ for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
+ .entrySet()) {
+ JobInitializationThread t = entry.getValue();
+ if (t.isAlive()) {
+ t.terminate();
+ t.interrupt();
+ }
+ }
+ }
+
+ /*
+ * Test method used only for testing purposes.
+ */
+ JobInProgress getInitializingJob(String queue) {
+ JobInitializationThread t = threadsToQueueMap.get(queue);
+ if (t == null) {
+ return null;
+ } else {
+ return t.getInitializingJob();
+ }
+ }
+
+ HashSet<JobID> getInitializedJobList() {
+ return initializedJobs;
+ }
+}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Fri Nov 28 02:32:01 2008
@@ -18,8 +18,10 @@
package org.apache.hadoop.mapred;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
@@ -62,27 +64,62 @@
// whether the queue supports priorities
boolean supportsPriorities;
- Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
+ Map<JobSchedulingInfo, JobInProgress> jobList; // for waiting jobs
Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
+ public Comparator<JobSchedulingInfo> comparator;
+
QueueInfo(boolean prio) {
this.supportsPriorities = prio;
if (supportsPriorities) {
// use the default priority-aware comparator
- this.waitingJobs =
- new TreeMap<JobSchedulingInfo, JobInProgress>(
- JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
- this.runningJobs =
- new TreeMap<JobSchedulingInfo, JobInProgress>(
- JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
+ comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
}
else {
- this.waitingJobs =
- new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
- this.runningJobs =
- new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
+ comparator = STARTTIME_JOB_COMPARATOR;
+ }
+ jobList = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+ runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+ }
+
+ Collection<JobInProgress> getJobs() {
+ synchronized (jobList) {
+ return Collections.unmodifiableCollection(
+ new LinkedList<JobInProgress>(jobList.values()));
+ }
+ }
+
+ Collection<JobInProgress> getRunningJobs() {
+ synchronized (runningJobs) {
+ return Collections.unmodifiableCollection(
+ new LinkedList<JobInProgress>(runningJobs.values()));
}
}
+
+ void addRunningJob(JobInProgress job) {
+ synchronized (runningJobs) {
+ runningJobs.put(new JobSchedulingInfo(job),job);
+ }
+ }
+
+ JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
+ synchronized (runningJobs) {
+ return runningJobs.remove(jobInfo);
+ }
+ }
+
+ JobInProgress removeJob(JobSchedulingInfo schedInfo) {
+ synchronized (jobList) {
+ return jobList.remove(schedInfo);
+ }
+ }
+
+ void addJob(JobInProgress job) {
+ synchronized (jobList) {
+ jobList.put(new JobSchedulingInfo(job), job);
+ }
+ }
+
}
// we maintain a hashmap of queue-names to queue info
@@ -109,14 +146,15 @@
* Returns the queue of running jobs associated with the name
*/
public Collection<JobInProgress> getRunningJobQueue(String queueName) {
- return jobQueues.get(queueName).runningJobs.values();
+ return jobQueues.get(queueName).getRunningJobs();
}
/**
- * Returns the queue of waiting jobs associated with the name
+ * Returns the queue of Uninitialised jobs associated with queue name.
+ *
*/
- public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
- return jobQueues.get(queueName).waitingJobs.values();
+ public Collection<JobInProgress> getJobs(String queueName) {
+ return jobQueues.get(queueName).getJobs();
}
@Override
@@ -133,19 +171,21 @@
}
// add job to waiting queue. It will end up in the right place,
// based on priority.
- qi.waitingJobs.put(new JobSchedulingInfo(job), job);
+ qi.addJob(job);
// let scheduler know.
scheduler.jobAdded(job);
}
+ /*
+ * The removal of the running jobs alone is done by the JobQueueManager.
+ * The removal of the jobs in the job queue is taken care by the
+ * JobInitializationPoller.
+ */
private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo,
QueueInfo qi) {
LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
- + job.getProfile().getQueueName() + " has completed");
- // job could be in running or waiting queue
- if (qi.runningJobs.remove(oldInfo) != null) {
- qi.waitingJobs.remove(oldInfo);
- }
+ + job.getProfile().getQueueName() + " has completed");
+ qi.removeRunningJob(oldInfo);
// let scheduler know
scheduler.jobCompleted(job);
}
@@ -159,25 +199,21 @@
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo,
QueueInfo qi) {
- JobSchedulingInfo newInfo = new JobSchedulingInfo(job);
- if (qi.waitingJobs.remove(oldInfo) == null) {
- qi.runningJobs.remove(oldInfo);
- // Add back to the running queue
- qi.runningJobs.put(newInfo, job);
- } else {
- // Add back to the waiting queue
- qi.waitingJobs.put(newInfo, job);
+ if(qi.removeJob(oldInfo) != null) {
+ qi.addJob(job);
+ }
+ if(qi.removeRunningJob(oldInfo) != null) {
+ qi.addRunningJob(job);
}
}
// This is used to move a job from the waiting queue to the running queue.
private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo,
QueueInfo qi) {
- // Remove from the waiting queue
- qi.waitingJobs.remove(oldInfo);
-
+ // Removing of the job from job list is responsibility of the
+ //initialization poller.
// Add the job to the running queue
- qi.runningJobs.put(new JobSchedulingInfo(job), job);
+ qi.addRunningJob(job);
}
// Update the scheduler as job's state has changed
@@ -221,4 +257,13 @@
}
}
+ public void removeJobFromQueue(JobInProgress job) {
+ String queue = job.getProfile().getQueueName();
+ QueueInfo qi = jobQueues.get(queue);
+ qi.removeJob(new JobSchedulingInfo(job));
+ }
+
+ Comparator<JobSchedulingInfo> getComparator(String queue) {
+ return jobQueues.get(queue).comparator;
+ }
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Nov 28 02:32:01 2008
@@ -23,7 +23,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -34,13 +33,109 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-//import org.apache.hadoop.mapred.CapacityTaskScheduler;
import org.apache.hadoop.conf.Configuration;
+
public class TestCapacityScheduler extends TestCase {
private static int jobCounter;
+ /**
+ * Test class that removes the asynchronous nature of job initialization.
+ *
+ * The run method is a dummy which just waits for completion. It is
+ * expected that test code calls the main method, initializeJobs, directly
+ * to trigger initialization.
+ */
+ class ControlledJobInitializer extends
+ JobInitializationPoller.JobInitializationThread {
+
+ boolean stopRunning;
+
+ public ControlledJobInitializer(JobInitializationPoller p) {
+ p.super();
+ }
+
+ @Override
+ public void run() {
+ while (!stopRunning) {
+ try {
+ synchronized(this) {
+ this.wait();
+ }
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ }
+
+ void stopRunning() {
+ stopRunning = true;
+ }
+ }
+
+ /**
+ * Test class that removes the asynchronous nature of job initialization.
+ *
+ * The run method is a dummy which just waits for completion. It is
+ * expected that test code calls the main method, selectJobsToInitialize,
+ * directly to trigger initialization.
+ *
+ * The class also creates the test worker thread objects of type
+ * ControlledJobInitializer instead of the objects of the actual class
+ */
+ class ControlledInitializationPoller extends JobInitializationPoller {
+
+ private boolean stopRunning;
+ private ArrayList<ControlledJobInitializer> workers;
+
+ public ControlledInitializationPoller(JobQueuesManager mgr,
+ CapacitySchedulerConf rmConf,
+ Set<String> queues) {
+ super(mgr, rmConf, queues);
+ }
+
+ @Override
+ public void run() {
+ // don't do anything here.
+ while (!stopRunning) {
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ JobInitializationThread createJobInitializationThread() {
+ ControlledJobInitializer t = new ControlledJobInitializer(this);
+ if (workers == null) {
+ workers = new ArrayList<ControlledJobInitializer>();
+ }
+ workers.add(t);
+ return t;
+ }
+
+ @Override
+ void selectJobsToInitialize() {
+ super.selectJobsToInitialize();
+ for (ControlledJobInitializer t : workers) {
+ t.initializeJobs();
+ }
+ }
+
+ void stopRunning() {
+ stopRunning = true;
+ for (ControlledJobInitializer t : workers) {
+ t.stopRunning();
+ t.interrupt();
+ }
+ }
+ }
+
static class FakeJobInProgress extends JobInProgress {
private FakeTaskTrackerManager taskTrackerManager;
@@ -208,12 +303,18 @@
new HashMap<String, TaskStatus>();
public FakeTaskTrackerManager() {
+ this(2, 1);
+ }
+
+ public FakeTaskTrackerManager(int maxMapSlots, int maxReduceSlots) {
trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ maxMapSlots, maxReduceSlots));
+ maxMapTasksPerTracker = maxMapSlots;
trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ maxMapSlots, maxReduceSlots));
+ maxReduceTasksPerTracker = maxReduceSlots;
}
public void addTaskTracker(String ttName) {
@@ -405,6 +506,16 @@
public boolean isPrioritySupported(String queue) {
return queueMap.get(queue).supportsPrio;
}
+
+ @Override
+ public long getSleepInterval() {
+ return 1;
+ }
+
+ @Override
+ public int getMaxWorkerThreads() {
+ return 1;
+ }
}
protected class FakeClock extends CapacityTaskScheduler.Clock {
@@ -522,8 +633,9 @@
scheduler.jobQueuesManager.jobUpdated(event);
// check if the jobs are missing from the waiting queue
- assertEquals("Waiting queue is garbled on job init", 0,
- scheduler.jobQueuesManager.getWaitingJobQueue("default")
+ // The jobs are not removed from waiting queue until they are scheduled
+ assertEquals("Waiting queue is garbled on job init", 2,
+ scheduler.jobQueuesManager.getJobs("default")
.size());
// test if changing the job priority/start-time works as expected in the
@@ -550,7 +662,7 @@
// check if the running queue size is correct
assertEquals("Job finish garbles the queue",
1, rqueue.size());
-
+
}
// test if the queue reflects the changes
@@ -607,7 +719,7 @@
private JobInProgress[] getJobsInQueue(boolean waiting) {
Collection<JobInProgress> queue =
waiting
- ? scheduler.jobQueuesManager.getWaitingJobQueue("default")
+ ? scheduler.jobQueuesManager.getJobs("default")
: scheduler.jobQueuesManager.getRunningJobQueue("default");
return queue.toArray(new JobInProgress[0]);
}
@@ -711,24 +823,23 @@
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
+ HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
+ submitJobs(1, 4, "default");
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
- // submit a job
- JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ while(mgr.getJobs("default").size() < 4){
+ Thread.sleep(1);
+ }
+ //Raise status change events for jobs submitted.
+ raiseStatusChangeEvents(mgr);
+ Collection<JobInProgress> jobs = scheduler.getJobs("default");
- // submit another job
- JobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+ assertTrue("Number of jobs returned by scheduler is wrong"
+ ,jobs.size() == 4);
- Collection<JobInProgress> jobs = scheduler.getJobs("default");
- assertEquals(2, jobs.size());
- Iterator<JobInProgress> iter = jobs.iterator();
- assertEquals(j1, iter.next());
- assertEquals(j2, iter.next());
-
- assertEquals(1, scheduler.jobQueuesManager.
- getRunningJobQueue("default").size());
- assertEquals(1, scheduler.jobQueuesManager.
- getWaitingJobQueue("default").size());
+ assertTrue("Submitted jobs and Returned jobs are not same",
+ subJobsList.get("u1").containsAll(jobs));
}
//Basic test to test GC allocation across the queues which have no
@@ -970,13 +1081,16 @@
// u1 finishes a task
taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
// u1 submits a few more jobs
- submitJob(JobStatus.PREP, 10, 10, null, "u1");
- submitJob(JobStatus.PREP, 10, 10, null, "u1");
- submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ // All the jobs are inited when submitted
+ // because of addition of Eager Job Initializer all jobs in this
+ //case would e initialised.
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// u2 also submits a job
- submitJob(JobStatus.PREP, 10, 10, null, "u2");
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
// now u3 submits a job
- submitJob(JobStatus.PREP, 2, 2, null, "u3");
+ submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
// next slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
@@ -1013,7 +1127,7 @@
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
// now submit a job to q2
- FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// update our structures
scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
@@ -1070,7 +1184,7 @@
// at this point, q3 is running 5 tasks (with a cap of 2), q4 is
// running 3 tasks (with a cap of 1).
// If we submit a job to 'default', we need to get 3 slots back.
- FakeJobInProgress j4 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// update our structures
scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
@@ -1184,4 +1298,287 @@
return tasks.get(0);
}
+ /*
+ * Test cases for Job Initialization poller.
+ */
+
+ /*
+ * This test verifies that the correct number of jobs for
+ * correct number of users is initialized.
+ * It also verifies that as jobs of users complete, new jobs
+ * from the correct users are initialized.
+ */
+ public void testJobInitialization() throws Exception {
+ // set up the scheduler
+ String[] qs = { "default" };
+ taskTrackerManager = new FakeTaskTrackerManager(1, 1);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues());
+ scheduler.setInitializationPoller(p);
+ scheduler.start();
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+
+ // submit 4 jobs each for 3 users.
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs = submitJobs(3,
+ 4, "default");
+
+ // get the jobs submitted.
+ ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
+ ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
+ ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
+
+ // reference to the initializedJobs data structure
+ // changes are reflected in the set as they are made by the poller
+ HashSet<JobID> initializedJobs = initPoller.getInitializedJobList();
+
+ // we should have 12 (3 x 4) jobs in the job queue
+ assertEquals(mgr.getJobs("default").size(), 12);
+
+ // run one poller iteration.
+ p.selectJobsToInitialize();
+
+ // the poller should initialize 6 jobs
+ // 3 users and 2 jobs from each
+ assertEquals(initializedJobs.size(), 6);
+
+ assertTrue("Initialized jobs didnt contain the user1 job 1",
+ initializedJobs.contains(u1Jobs.get(0).getJobID()));
+ assertTrue("Initialized jobs didnt contain the user1 job 2",
+ initializedJobs.contains(u1Jobs.get(1).getJobID()));
+ assertTrue("Initialized jobs didnt contain the user2 job 1",
+ initializedJobs.contains(u2Jobs.get(0).getJobID()));
+ assertTrue("Initialized jobs didnt contain the user2 job 2",
+ initializedJobs.contains(u2Jobs.get(1).getJobID()));
+ assertTrue("Initialized jobs didnt contain the user3 job 1",
+ initializedJobs.contains(u3Jobs.get(0).getJobID()));
+ assertTrue("Initialized jobs didnt contain the user3 job 2",
+ initializedJobs.contains(u3Jobs.get(1).getJobID()));
+
+ // now submit one more job from another user.
+ FakeJobInProgress u4j1 =
+ submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+
+ // run the poller again.
+ p.selectJobsToInitialize();
+
+ // since no jobs have started running, there should be no
+ // change to the initialized jobs.
+ assertEquals(initializedJobs.size(), 6);
+ assertFalse("Initialized jobs contains user 4 jobs",
+ initializedJobs.contains(u4j1.getJobID()));
+
+ // This event simulates raising the event on completion of setup task
+ // and moves the job to the running list for the scheduler to pick up.
+ raiseStatusChangeEvents(mgr);
+
+ // get some tasks assigned.
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ Task t3 = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ Task t4 = checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(0));
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(0));
+ taskTrackerManager.finishTask("tt2", t3.getTaskID().toString(), u1Jobs.get(1));
+ taskTrackerManager.finishTask("tt2", t4.getTaskID().toString(), u1Jobs.get(1));
+
+ // as some jobs have running tasks, the poller will now
+ // pick up new jobs to initialize.
+ p.selectJobsToInitialize();
+
+ // count should still be the same
+ assertEquals(initializedJobs.size(), 6);
+
+ // new jobs that have got into the list
+ assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
+ assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
+ raiseStatusChangeEvents(mgr);
+
+ // the first two jobs are done, no longer in the initialized list.
+ assertFalse("Initialized jobs contains the user1 job 1",
+ initializedJobs.contains(u1Jobs.get(0).getJobID()));
+ assertFalse("Initialized jobs contains the user1 job 2",
+ initializedJobs.contains(u1Jobs.get(1).getJobID()));
+
+ // finish one more job
+ t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ t2 = checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(2));
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(2));
+
+ // no new jobs should be picked up, because max user limit
+ // is still 3.
+ p.selectJobsToInitialize();
+
+ assertEquals(initializedJobs.size(), 5);
+
+ // run 1 more jobs..
+ t1 = checkAssignment("tt1", "attempt_test_0004_m_000001_0 on tt1");
+ t1 = checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(3));
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(3));
+
+ // Now initialised jobs should contain user 4's job, as
+ // user 1's jobs are all done and the number of users is
+ // below the limit
+ p.selectJobsToInitialize();
+ assertEquals(initializedJobs.size(), 5);
+ assertTrue(initializedJobs.contains(u4j1.getJobID()));
+
+ p.stopRunning();
+ }
+
+ /*
+ * testHighPriorityJobInitialization() shows behaviour when high priority job
+ * is submitted into a queue and how initialisation happens for the same.
+ */
+ public void testHighPriorityJobInitialization() throws Exception {
+ String[] qs = { "default"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues());
+ scheduler.setInitializationPoller(p);
+ scheduler.start();
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+ HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+
+ // submit 3 jobs for 3 users
+ submitJobs(3,3,"default");
+ p.selectJobsToInitialize();
+ assertEquals(initializedJobsList.size(), 6);
+
+ // submit 2 job for a different user. one of them will be made high priority
+ FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+ FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+
+ p.selectJobsToInitialize();
+
+ // shouldn't change
+ assertEquals(initializedJobsList.size(), 6);
+
+ assertFalse("Contains U4J1 high priority job " ,
+ initializedJobsList.contains(u4j1.getJobID()));
+ assertFalse("Contains U4J2 Normal priority job " ,
+ initializedJobsList.contains(u4j2.getJobID()));
+
+ // change priority of one job
+ taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
+
+ p.selectJobsToInitialize();
+
+ // the high priority job should get initialized, but not the
+ // low priority job from u4, as we have already exceeded the
+ // limit.
+ assertEquals(initializedJobsList.size(), 7);
+ assertTrue("Does not contain U4J1 high priority job " ,
+ initializedJobsList.contains(u4j1.getJobID()));
+ assertFalse("Contains U4J2 Normal priority job " ,
+ initializedJobsList.contains(u4j2.getJobID()));
+ p.stopRunning();
+ }
+
+ public void testJobMovement() throws Exception {
+ String[] qs = { "default"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues());
+ scheduler.setInitializationPoller(p);
+ scheduler.start();
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+ HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+
+ // submit a job
+ FakeJobInProgress job =
+ submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+ p.selectJobsToInitialize();
+
+ assertEquals(initializedJobsList.size(), 1);
+
+ // make it running.
+ raiseStatusChangeEvents(mgr);
+
+ // it should be there in both the queues.
+ assertTrue("Job not present in Job Queue",
+ mgr.getJobs("default").contains(job));
+ assertTrue("Job not present in Running Queue",
+ mgr.getRunningJobQueue("default").contains(job));
+
+ // assign a task
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+
+ p.selectJobsToInitialize();
+
+ // now this task should be removed from the initialized list.
+ assertTrue(initializedJobsList.isEmpty());
+
+ // the job should also be removed from the job queue as tasks
+ // are scheduled
+ assertFalse("Job present in Job Queue",
+ mgr.getJobs("default").contains(job));
+
+ // complete tasks and job
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job);
+ taskTrackerManager.finalizeJob(job);
+
+ // make sure it is removed from the run queue
+ assertFalse("Job present in running queue",
+ mgr.getRunningJobQueue("default").contains(job));
+ }
+
+ private void raiseStatusChangeEvents(JobQueuesManager mgr) {
+ Collection<JobInProgress> jips = mgr.getJobs("default");
+ for(JobInProgress jip : jips) {
+ if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
+ JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
+ EventType.RUN_STATE_CHANGED,jip.getStatus());
+ mgr.jobUpdated(evt);
+ }
+ }
+ }
+
+ private HashMap<String, ArrayList<FakeJobInProgress>> submitJobs(
+ int numberOfUsers, int numberOfJobsPerUser, String queue)
+ throws Exception{
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
+ new HashMap<String, ArrayList<FakeJobInProgress>>();
+ for (int i = 1; i <= numberOfUsers; i++) {
+ String user = String.valueOf("u" + i);
+ ArrayList<FakeJobInProgress> jips = new ArrayList<FakeJobInProgress>();
+ for (int j = 1; j <= numberOfJobsPerUser; j++) {
+ jips.add(submitJob(JobStatus.PREP, 1, 1, queue, user));
+ }
+ userJobs.put(user, jips);
+ }
+ return userJobs;
+
+ }
+
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Fri Nov 28 02:32:01 2008
@@ -52,11 +52,13 @@
new String[] { "guaranteed-capacity",
"reclaim-time-limit",
"supports-priority",
- "minimum-user-limit-percent" },
+ "minimum-user-limit-percent",
+ "maximum-initialized-jobs-per-user"},
new String[] { "100",
"300",
"false",
- "100" }
+ "100",
+ "2" }
);
}
@@ -86,22 +88,26 @@
new String[] { "guaranteed-capacity",
"reclaim-time-limit",
"supports-priority",
- "minimum-user-limit-percent" },
+ "minimum-user-limit-percent",
+ "maximum-initialized-jobs-per-user"},
new String[] { "10",
"600",
"true",
- "25" }
+ "25",
+ "4"}
);
Map<String, String> q2Props = setupQueueProperties(
new String[] { "guaranteed-capacity",
"reclaim-time-limit",
"supports-priority",
- "minimum-user-limit-percent" },
+ "minimum-user-limit-percent",
+ "maximum-initialized-jobs-per-user"},
new String[] { "100",
"6000",
"false",
- "50" }
+ "50",
+ "1"}
);
startConfig();
@@ -139,6 +145,7 @@
}
expProperties.put("reclaim-time-limit", "300");
expProperties.put("supports-priority", "false");
+ expProperties.put("maximum-initialized-jobs-per-user", "2");
queueDetails.put("default", expProperties);
checkQueueProperties(testConf, queueDetails);
}