You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/11/28 11:32:01 UTC

svn commit: r721415 - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/

Author: yhemanth
Date: Fri Nov 28 02:32:01 2008
New Revision: 721415

URL: http://svn.apache.org/viewvc?rev=721415&view=rev
Log:
HADOOP-4513. Initialize jobs asynchronously in the capacity scheduler. Contributed by Sreekanth Ramakrishnan.

Added:
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/capacity-scheduler.xml.template
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Nov 28 02:32:01 2008
@@ -144,6 +144,9 @@
     HADOOP-2774. Add counters tracking records spilled to disk in MapTask and
     ReduceTask. (Ravi Gummadi via cdouglas)
 
+    HADOOP-4513. Initialize jobs asynchronously in the capacity scheduler.
+    (Sreekanth Ramakrishnan via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Fri Nov 28 02:32:01 2008
@@ -46,6 +46,13 @@
     value of 100 implies no user limits are imposed. 
     </description>
   </property>
+  <property>
+    <name>mapred.capacity-scheduler.queue.default.maximum-initialized-jobs-per-user</name>
+    <value>2</value>
+    <description>The maximum number of jobs to be pre-initialized for a user
+    of the job queue.
+    </description>
+  </property>
   
   <!-- The default configuration settings for the capacity task scheduler -->
   <!-- The default values would be applied to all the queues which don't have -->
@@ -74,4 +81,35 @@
       for the job queue at any given point of time by default.
     </description>
   </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user</name>
+    <value>2</value>
+    <description>The maximum number of jobs to be pre-initialized for a user
+    of the job queue.
+    </description>
+  </property>
+
+
+  <!-- Capacity scheduler Job Initialization configuration parameters -->
+  <property>
+    <name>mapred.capacity-scheduler.init-poll-interval</name>
+    <value>5000</value>
+    <description>The amount of time in miliseconds which is used to poll 
+    the job queues for jobs to initialize.
+    </description>
+  </property>
+  <property>
+    <name>mapred.capacity-scheduler.init-worker-threads</name>
+    <value>5</value>
+    <description>Number of worker threads which would be used by
+    Initialization poller to initialize jobs in a set of queue.
+    If number mentioned in property is equal to number of job queues
+    then a single thread would initialize jobs in a queue. If lesser
+    then a thread would get a set of queues assigned. If the number
+    is greater then number of threads would be equal to number of 
+    job queues.
+    </description>
+  </property>
+
 </configuration>

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Nov 28 02:32:01 2008
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.mapred;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -46,6 +44,8 @@
     "mapred.capacity-scheduler.queue.";
 
   private Configuration rmConf;
+
+  private int defaultMaxJobsPerUsersToInitialize;
   
   /**
    * Create a new ResourceManagerConf.
@@ -83,6 +83,9 @@
         "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
     defaultSupportPriority = rmConf.getBoolean(
         "mapred.capacity-scheduler.default-supports-priority", false);
+    defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
+        "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
+        2);
   }
   
   /**
@@ -247,5 +250,102 @@
                                                   String property) {
       return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
+
+  /**
+   * Gets the maximum number of jobs which are allowed to initialize in the
+   * job queue.
+   * 
+   * @param queue queue name.
+   * @return maximum number of jobs allowed to be initialized per user.
+   * @throws IllegalArgumentException if maximum number of users is negative
+   * or zero.
+   */
+  public int getMaxJobsPerUserToInitialize(String queue) {
+    int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
+        "maximum-initialized-jobs-per-user"), 
+        defaultMaxJobsPerUsersToInitialize);
+    if(maxJobsPerUser <= 0) {
+      throw new IllegalArgumentException(
+          "Invalid maximum jobs per user configuration " + maxJobsPerUser);
+    }
+    return maxJobsPerUser;
+  }
   
+  /**
+   * Sets the maximum number of jobs which are allowed to be initialized 
+   * for a user in the queue.
+   * 
+   * @param queue queue name.
+   * @param value maximum number of jobs allowed to be initialized per user.
+   */
+  public void setMaxJobsPerUserToInitialize(String queue, int value) {
+    rmConf.setInt(toFullPropertyName(queue, 
+        "maximum-initialized-jobs-per-user"), value);
+  }
+
+  /**
+   * Amount of time in miliseconds which poller thread and initialization
+   * thread would sleep before looking at the queued jobs.
+   *  
+   * @return time in miliseconds.
+   * @throws IllegalArgumentException if time is negative or zero.
+   */
+  public long getSleepInterval() {
+    long sleepInterval = rmConf.getLong(
+        "mapred.capacity-scheduler.init-poll-interval", -1);
+    
+    if(sleepInterval <= 0) {
+      throw new IllegalArgumentException(
+          "Invalid initializater poller interval " + sleepInterval);
+    }
+    
+    return sleepInterval;
+  }
+
+  /**
+   * Gets maximum number of threads which are spawned to initialize jobs
+   * in job queue in  parallel. The number of threads should be always less than
+   * or equal to number of job queues present.
+   * 
+   * If number of threads is configured to be more than job queues present,
+   * then number of job queues is used as number of threads used for initializing
+   * jobs.
+   * 
+   * So a given thread can have responsibility of initializing jobs from more 
+   * than one queue.
+   * 
+   * @return maximum number of threads spawned to initialize jobs in job queue
+   * in parallel.
+   */
+  public int getMaxWorkerThreads() {
+    int maxWorkerThreads = rmConf.getInt(
+        "mapred.capacity-scheduler.init-worker-threads", 0);
+    if(maxWorkerThreads <= 0) {
+      throw new IllegalArgumentException(
+          "Invalid initializater worker thread number " + maxWorkerThreads);
+    }
+    return maxWorkerThreads;
+  }
+  /**
+   * Set the sleep interval which initialization poller would sleep before 
+   * it looks at the jobs in the job queue.
+   * 
+   * @param interval sleep interval
+   */
+  public void setSleepInterval(long interval) {
+    rmConf.setLong(
+        "mapred.capacity-scheduler.init-poll-interval", interval);
+  }
+  
+  /**
+   * Sets number of threads which can be spawned to initialize jobs in
+   * parallel.
+   * 
+   * @param poolSize number of threads to be spawned to initialize jobs
+   * in parallel.
+   */
+  public void setMaxWorkerThreads(int poolSize) {
+    rmConf.setInt(
+        "mapred.capacity-scheduler.init-worker-threads", poolSize);
+  }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Nov 28 02:32:01 2008
@@ -534,16 +534,7 @@
            * consider the first few jobs per user.
            */ 
         }
-        // update stats on waiting jobs
-        for (JobInProgress j: 
-          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
-          // pending tasks
-          if (qsi.numPendingTasks > getClusterCapacity()) {
-            // that's plenty. no need for more computation
-            break;
-          }
-          qsi.numPendingTasks += getPendingTasks(j);
-        }
+        //TODO do we need to update stats on waiting jobs
       }
     }
 
@@ -665,26 +656,7 @@
         }
       }
       
-      // if we're here, we found nothing in the running jobs. Time to 
-      // look at waiting jobs. Get first job of a user that is not over limit
-      for (JobInProgress j: 
-        scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
-        // is this job's user over limit?
-        if (usersOverLimit.contains(j.getProfile().getUser())) {
-          // user over limit. 
-          continue;
-        }
-        // this job is a candidate for running. Initialize it, move it
-        // to run queue
-        j.initTasks();
-        // We found a suitable job. Get task from it.
-        t = obtainNewTask(taskTracker, j);
-        if (t != null) {
-          LOG.debug("Getting task from job " + 
-                    j.getJobID() + " in queue " + qsi.queueName);
-          return t;
-        }
-      }
+
       
       // if we're here, we haven't found anything. This could be because 
       // there is nothing to run, or that the user limit for some user is 
@@ -705,19 +677,6 @@
             }
           }
         }
-        // look at waiting jobs the same way
-        for (JobInProgress j: 
-          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) {
-          if (usersOverLimit.contains(j.getProfile().getUser())) {
-            j.initTasks();
-            t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              LOG.debug("Getting task from job " + 
-                        j.getJobID() + " in queue " + qsi.queueName);
-              return t;
-            }
-          }
-        }
       }
       
       return null;
@@ -766,12 +725,10 @@
       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
         Collection<JobInProgress> runJobs = 
           scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
-        Collection<JobInProgress> waitJobs = 
-          scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName);
         s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
             qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity + 
             ", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
-            ", wait jobs=" + waitJobs.size() + "*** ");
+            "*** ");
       }
       LOG.debug(s);
     }
@@ -965,8 +922,9 @@
     }
   }
   private Clock clock;
+  private JobInitializationPoller initializationPoller;
+
 
-  
   public CapacityTaskScheduler() {
     this(new Clock());
   }
@@ -1053,6 +1011,14 @@
     // listen to job changes
     taskTrackerManager.addJobInProgressListener(jobQueuesManager);
 
+    //Start thread for initialization
+    if (initializationPoller == null) {
+      this.initializationPoller = new JobInitializationPoller(
+          jobQueuesManager,rmConf,queues);
+    }
+    initializationPoller.init(queueManager.getQueues(), rmConf);
+    initializationPoller.setDaemon(true);
+    initializationPoller.start();
     // start thread for redistributing capacity
     this.reclaimCapacityThread = 
       new Thread(new ReclaimCapacity(),"reclaimCapacity");
@@ -1061,6 +1027,11 @@
     LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
   }
   
+  /** mostly for testing purposes */
+  void setInitializationPoller(JobInitializationPoller p) {
+    this.initializationPoller = p;
+  }
+  
   @Override
   public synchronized void terminate() throws IOException {
     if (!started) return;
@@ -1071,6 +1042,7 @@
     // tell the reclaim thread to stop
     stopReclaim = true;
     started = false;
+    initializationPoller.terminate();
     super.terminate();
   }
   
@@ -1172,13 +1144,20 @@
       jobCollection.addAll(runningJobs);
     }
     Collection<JobInProgress> waitingJobs = 
-      jobQueuesManager.getWaitingJobQueue(queueName);
+      jobQueuesManager.getJobs(queueName);
+    Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
     if(waitingJobs != null) {
-      jobCollection.addAll(waitingJobs);
+      tempCollection.addAll(waitingJobs);
+    }
+    tempCollection.removeAll(runningJobs);
+    if(!tempCollection.isEmpty()) {
+      jobCollection.addAll(tempCollection);
     }
     return jobCollection;
   }
-
   
+  JobInitializationPoller getInitializationPoller() {
+    return initializationPoller;
+  }
 }
 

Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=721415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Fri Nov 28 02:32:01 2008
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class asynchronously initializes jobs submitted to the
+ * Map/Reduce cluster running with the {@link CapacityTaskScheduler}.
+ *
+ * <p>
+ * The class comprises of a main poller thread, and a set of worker
+ * threads that together initialize the jobs. The poller thread periodically
+ * looks at jobs submitted to the scheduler, and selects a set of them
+ * to be initialized. It passes these to the worker threads for initializing.
+ * Each worker thread is configured to look at jobs submitted to a fixed
+ * set of queues. It initializes jobs in a round robin manner - selecting
+ * the first job in order from each queue ready to be initialized.
+ * </p>
+ * 
+ * <p>
+ * An initialized job occupies memory resources on the Job Tracker. Hence,
+ * the poller limits the number of jobs initialized at any given time to
+ * a configured limit. The limit is specified per user per queue.
+ * </p>
+ * 
+ * <p>
+ * However, since a job needs to be initialized before the scheduler can
+ * select tasks from it to run, it tries to keep a backlog of jobs 
+ * initialized so the scheduler does not need to wait and let empty slots
+ * go waste. The core logic of the poller is to pick up the right jobs,
+ * which have a good potential to be run next by the scheduler. To do this,
+ * it picks up jobs submitted across users and across queues to account
+ * both for guaranteed capacities and user limits. It also always initializes
+ * high priority jobs, whenever they need to be initialized, even if this
+ * means going over the limit for initialized jobs.
+ * </p>
+ */
+public class JobInitializationPoller extends Thread {
+
+  private static final Log LOG = LogFactory
+      .getLog(JobInitializationPoller.class.getName());
+
+  /*
+   * The poller picks up jobs across users to initialize based on user limits.
+   * Suppose the user limit for a queue is 25%, it means atmost 4 users' jobs
+   * can run together. However, in order to account for jobs from a user that
+   * might complete faster than others, it initializes jobs from an additional
+   * number of users as a backlog. This variable defines the additional
+   * number of users whose jobs can be considered for initializing. 
+   */
+  private static final int MAX_ADDITIONAL_USERS_TO_INIT = 2;
+
+  private JobQueuesManager jobQueueManager;
+  private long sleepInterval;
+  private int poolSize;
+
+  /**
+   * A worker thread that initializes jobs in one or more queues assigned to
+   * it.
+   *
+   * Jobs are initialized in a round robin fashion one from each queue at a
+   * time.
+   */
+  class JobInitializationThread extends Thread {
+
+    private JobInProgress initializingJob;
+
+    private volatile boolean startIniting;
+    private AtomicInteger currentJobCount = new AtomicInteger(0); // number of jobs to initialize
+
+    /**
+     * The hash map which maintains relationship between queue to jobs to
+     * initialize per queue.
+     */
+    private HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>> jobsPerQueue;
+
+    public JobInitializationThread() {
+      startIniting = true;
+      jobsPerQueue = new HashMap<String, TreeMap<JobSchedulingInfo, JobInProgress>>();
+    }
+
+    @Override
+    public void run() {
+      while (startIniting) {
+        initializeJobs();  
+        try {
+          if (startIniting) {
+            Thread.sleep(sleepInterval);
+          } else {
+            break;
+          }
+        } catch (Throwable t) {
+        }
+      }
+    }
+
+    // The key method that initializes jobs from queues
+    // This method is package-private to allow test cases to call it
+    // synchronously in a controlled manner.
+    void initializeJobs() {
+      // while there are more jobs to initialize...
+      while (currentJobCount.get() > 0) {
+        Set<String> queues = jobsPerQueue.keySet();
+        for (String queue : queues) {
+          JobInProgress job = getFirstJobInQueue(queue);
+          if (job == null) {
+            continue;
+          }
+          LOG.info("Initializing job : " + job.getJobID() + " in Queue "
+              + job.getProfile().getQueueName() + " For user : "
+              + job.getProfile().getUser());
+          try {
+            if (startIniting) {
+              setInitializingJob(job);
+              job.initTasks();
+              setInitializingJob(null);
+            } else {
+              break;
+            }
+          } catch (Throwable t) {
+            LOG.info("Job initialization failed:\n"
+                + StringUtils.stringifyException(t));
+            if (job != null)
+              job.fail();
+          }
+        }
+      }
+    }
+
+    /**
+     * This method returns the first job in the queue and removes the same.
+     * 
+     * @param queue
+     *          queue name
+     * @return First job in the queue and removes it.
+     */
+    private JobInProgress getFirstJobInQueue(String queue) {
+      TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue
+          .get(queue);
+      synchronized (jobsList) {
+        if (jobsList.isEmpty()) {
+          return null;
+        }
+        Iterator<JobInProgress> jobIterator = jobsList.values().iterator();
+        JobInProgress job = jobIterator.next();
+        jobIterator.remove();
+        currentJobCount.getAndDecrement();
+        return job;
+      }
+    }
+
+    /*
+     * Test method to check if the thread is currently initialising the job
+     */
+    synchronized JobInProgress getInitializingJob() {
+      return this.initializingJob;
+    }
+    
+    synchronized void setInitializingJob(JobInProgress job) {
+      this.initializingJob  = job;
+    }
+
+    void terminate() {
+      startIniting = false;
+    }
+
+    void addJobsToQueue(String queue, JobInProgress job) {
+      TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue
+          .get(queue);
+      if (jobs == null) {
+        LOG.error("Invalid queue passed to the thread : " + queue
+            + " For job :: " + job.getJobID());
+      }
+      synchronized (jobs) {
+        JobSchedulingInfo schedInfo = new JobSchedulingInfo(job);
+        jobs.put(schedInfo, job);
+        currentJobCount.getAndIncrement();
+      }
+    }
+
+    void addQueue(String queue) {
+      TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(
+          jobQueueManager.getComparator(queue));
+      jobsPerQueue.put(queue, jobs);
+    }
+  }
+
+  /**
+   * The queue information class maintains following information per queue:
+   * Maximum users allowed to initialize job in the particular queue. Maximum
+   * jobs allowed to be initialize per user in the queue.
+   * 
+   */
+  private class QueueInfo {
+    String queue;
+    int maxUsersAllowedToInitialize;
+    int maxJobsPerUserToInitialize;
+
+    public QueueInfo(String queue, int maxUsersAllowedToInitialize,
+        int maxJobsPerUserToInitialize) {
+      this.queue = queue;
+      this.maxJobsPerUserToInitialize = maxJobsPerUserToInitialize;
+      this.maxUsersAllowedToInitialize = maxUsersAllowedToInitialize;
+    }
+  }
+
+  /**
+   * Map which contains the configuration used for initializing jobs
+   * in that associated to a particular job queue.
+   */
+  private HashMap<String, QueueInfo> jobQueues;
+
+  /**
+   * Set of jobs which have been passed to Initialization threads.
+   * This is maintained so that we dont call initTasks() for same job twice.
+   */
+  private HashSet<JobID> initializedJobs;
+
+  private volatile boolean running;
+
+  /**
+   * The map which provides information which thread should be used to
+   * initialize jobs for a given job queue.
+   */
+  private HashMap<String, JobInitializationThread> threadsToQueueMap;
+
+  public JobInitializationPoller(JobQueuesManager mgr,
+      CapacitySchedulerConf rmConf, Set<String> queue) {
+    initializedJobs = new HashSet<JobID>();
+    jobQueues = new HashMap<String, QueueInfo>();
+    this.jobQueueManager = mgr;
+    threadsToQueueMap = new HashMap<String, JobInitializationThread>();
+    super.setName("JobInitializationPollerThread");
+    running = true;
+  }
+
+  /*
+   * method to read all configuration values required by the initialisation
+   * poller
+   */
+
+  void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
+    for (String queue : queues) {
+      int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
+      int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
+      int maxJobsPerUserToInitialize = capacityConf
+          .getMaxJobsPerUserToInitialize(queue);
+      QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
+          maxJobsPerUserToInitialize);
+      jobQueues.put(queue, qi);
+    }
+    sleepInterval = capacityConf.getSleepInterval();
+    poolSize = capacityConf.getMaxWorkerThreads();
+    if (poolSize > queues.size()) {
+      poolSize = queues.size();
+    }
+    assignThreadsToQueues();
+    Collection<JobInitializationThread> threads = threadsToQueueMap.values();
+    for (JobInitializationThread t : threads) {
+      if (!t.isAlive()) {
+        t.setDaemon(true);
+        t.start();
+      }
+    }
+  }
+
+  public void run() {
+    while (running) {
+      try {
+        selectJobsToInitialize();
+        if (!this.isInterrupted()) {
+          Thread.sleep(sleepInterval);
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Job Initialization poller interrupted"
+            + StringUtils.stringifyException(e));
+      }
+    }
+  }
+
+  // The key method that picks up jobs to initialize for each queue.
+  // The jobs picked up are added to the worker thread that is handling
+  // initialization for that queue.
+  // The method is package private to allow tests to call it synchronously
+  // in a controlled manner.
+  void selectJobsToInitialize() {
+    for (String queue : jobQueues.keySet()) {
+      ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
+      //if (LOG.isDebugEnabled()) {
+        printJobs(jobsToInitialize);
+      //}
+      JobInitializationThread t = threadsToQueueMap.get(queue);
+      for (JobInProgress job : jobsToInitialize) {
+        t.addJobsToQueue(queue, job);
+      }
+    }
+  }
+
+  private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
+    for (JobInProgress job : jobsToInitialize) {
+      LOG.info("Passing to Initializer Job Id :" + job.getJobID()
+          + " User: " + job.getProfile().getUser() + " Queue : "
+          + job.getProfile().getQueueName());
+    }
+  }
+
+  // This method exists to be overridden by test cases that wish to
+  // create a test-friendly worker thread which can be controlled
+  // synchronously.
+  JobInitializationThread createJobInitializationThread() {
+    return new JobInitializationThread();
+  }
+  
+  private void assignThreadsToQueues() {
+    int countOfQueues = jobQueues.size();
+    String[] queues = (String[]) jobQueues.keySet().toArray(
+        new String[countOfQueues]);
+    int numberOfQueuesPerThread = countOfQueues / poolSize;
+    int numberOfQueuesAssigned = 0;
+    for (int i = 0; i < poolSize; i++) {
+      JobInitializationThread initializer = createJobInitializationThread();
+      int batch = (i * numberOfQueuesPerThread);
+      for (int j = batch; j < (batch + numberOfQueuesPerThread); j++) {
+        initializer.addQueue(queues[j]);
+        threadsToQueueMap.put(queues[j], initializer);
+        numberOfQueuesAssigned++;
+      }
+    }
+
+    if (numberOfQueuesAssigned < countOfQueues) {
+      // Assign remaining queues in round robin fashion to other queues
+      int startIndex = 0;
+      for (int i = numberOfQueuesAssigned; i < countOfQueues; i++) {
+        JobInitializationThread t = threadsToQueueMap
+            .get(queues[startIndex]);
+        t.addQueue(queues[i]);
+        threadsToQueueMap.put(queues[i], t);
+        startIndex++;
+      }
+    }
+  }
+
+  /*
+   * Select jobs to be initialized for a given queue.
+   * 
+   * The jobs are selected such that they are within the limits
+   * for number of users and number of jobs per user in the queue.
+   * The only exception is if high priority jobs are waiting to be
+   * initialized. In that case, we could exceed the configured limits.
+   * However, we try to restrict the excess to a minimum.
+   */
+  ArrayList<JobInProgress> getJobsToInitialize(String queue) {
+    QueueInfo qi = jobQueues.get(queue);
+    ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
+    // use the configuration parameter which is configured for the particular
+    // queue.
+    int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
+    int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
+    // calculate maximum number of jobs which can be allowed to initialize
+    // for this queue.
+    // This value is used when a user submits a high priority job after we
+    // have initialized jobs for that queue and none of them is scheduled.
+    // This would prevent us from initializing extra jobs for that particular
+    // user. Explanation given at end of method.
+    int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
+        * maxJobsPerUserAllowedToInitialize;
+    Collection<JobInProgress> jobs = jobQueueManager.getJobs(queue);
+    int countOfJobsInitialized = 0;
+    HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
+    for (JobInProgress job : jobs) {
+      /*
+       * First check if job has been scheduled or completed or killed. If so
+       * then remove from uninitialised jobs. Remove from Job queue
+       */
+      if ((job.getStatus().getRunState() == JobStatus.RUNNING)
+          && (job.runningMaps() > 0 || job.runningReduces() > 0
+              || job.finishedMaps() > 0 || job.finishedReduces() > 0)) {
+        LOG.debug("Removing from the queue " + job.getJobID());
+        initializedJobs.remove(job.getJobID());
+        jobQueueManager.removeJobFromQueue(job);
+        continue;
+      } else if (job.isComplete()) {
+        LOG.debug("Removing from completed job from " + "the queue "
+            + job.getJobID());
+        initializedJobs.remove(job.getJobID());
+        jobQueueManager.removeJobFromQueue(job);
+        continue;
+      }
+      String user = job.getProfile().getUser();
+      int numberOfJobs = userJobsInitialized.get(user) == null ? 0
+          : userJobsInitialized.get(user);
+      // If the job is already initialized then add the count against user
+      // then continue.
+      if (initializedJobs.contains(job.getJobID())) {
+        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
+        countOfJobsInitialized++;
+        continue;
+      }
+      boolean isUserPresent = userJobsInitialized.containsKey(user);
+      /*
+       * If the user is present in user list and size of user list is less
+       * maximum allowed users initialize then initialize this job and add this
+       * user to the global list.
+       * 
+       * Else if he is present we check if his number of jobs has not crossed
+       * his quota and global quota.
+       * 
+       * The logic behind using a global per queue job can be understood by example
+       * below: Consider 3 users submitting normal priority job in a job queue with
+       * user limit as 100. (Max jobs per user = 2)
+       * 
+       * U1J1,U1J2,U1J3....,U3J3.
+       * 
+       * Jobs initialized would be
+       * 
+       * U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
+       * 
+       * Now consider a case where U4 comes in and submits a high priority job.
+       * 
+       * U4J1 --- High Priority JOb, U4J2---- Normal priority job.
+       * 
+       * So, if we dont use global per queue value we would end up initializing both
+       * U4 jobs which is not correct.
+       * 
+       * By using a global value we ensure that we dont initialize any extra jobs
+       * for a user.
+       */
+      if (!isUserPresent
+          && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
+        // this is a new user being considered and the number of users
+        // is within limits.
+        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
+        jobsToInitialize.add(job);
+        initializedJobs.add(job.getJobID());
+        countOfJobsInitialized++;
+      } else if (isUserPresent
+          && numberOfJobs < maxJobsPerUserAllowedToInitialize
+          && countOfJobsInitialized < maxJobsPerQueueToInitialize) {
+        /*
+         * this is an existing user and the number of jobs per user
+         * is within limits, as also the number of jobs per queue.
+         * We need the check on number of jobs per queue to restrict
+         * the number of jobs we initialize over the limit due to high
+         * priority jobs.
+         * 
+         * For e.g Consider 3 users submitting normal priority job in 
+         * a job queue with user limit as 100 and max jobs per user as 2
+         * Say the jobs are U1J1,U1J2,U1J3....,U3J3.
+         * 
+         * Jobs initialized would be U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
+         * 
+         * Now consider a case where U4 comes in and submits a high priority job
+         * and a normal priority job. Say U4J1 and U4J2
+         * 
+         * If we dont consider the number of jobs per queue we would end up 
+         * initializing both jobs from U4. Initializing the second job is
+         * unnecessary.
+         */
+        userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
+        jobsToInitialize.add(job);
+        initializedJobs.add(job.getJobID());
+        countOfJobsInitialized++;
+      }
+    }
+    return jobsToInitialize;
+  }
+
+
+  void terminate() {
+    running = false;
+    for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
+        .entrySet()) {
+      JobInitializationThread t = entry.getValue();
+      if (t.isAlive()) {
+        t.terminate();
+        t.interrupt();
+      }
+    }
+  }
+
+  /*
+   * Test method used only for testing purposes.
+   */
+  JobInProgress getInitializingJob(String queue) {
+    JobInitializationThread t = threadsToQueueMap.get(queue);
+    if (t == null) {
+      return null;
+    } else {
+      return t.getInitializingJob();
+    }
+  }
+
+  HashSet<JobID> getInitializedJobList() {
+    return initializedJobs;
+  }
+}

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Fri Nov 28 02:32:01 2008
@@ -18,8 +18,10 @@
 package org.apache.hadoop.mapred;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -62,27 +64,62 @@
 
     // whether the queue supports priorities
     boolean supportsPriorities;
-    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
+    Map<JobSchedulingInfo, JobInProgress> jobList; // for waiting jobs
     Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
     
+    public Comparator<JobSchedulingInfo> comparator;
+    
     QueueInfo(boolean prio) {
       this.supportsPriorities = prio;
       if (supportsPriorities) {
         // use the default priority-aware comparator
-        this.waitingJobs = 
-          new TreeMap<JobSchedulingInfo, JobInProgress>(
-              JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
-        this.runningJobs = 
-          new TreeMap<JobSchedulingInfo, JobInProgress>(
-              JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR);
+        comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
       }
       else {
-        this.waitingJobs = 
-          new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
-        this.runningJobs = 
-          new TreeMap<JobSchedulingInfo, JobInProgress>(STARTTIME_JOB_COMPARATOR);
+        comparator = STARTTIME_JOB_COMPARATOR;
+      }
+      jobList = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+      runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+    }
+    
+    Collection<JobInProgress> getJobs() {
+      synchronized (jobList) {
+        return Collections.unmodifiableCollection(
+            new LinkedList<JobInProgress>(jobList.values()));
+      }
+    }
+    
+    Collection<JobInProgress> getRunningJobs() {
+      synchronized (runningJobs) {
+       return Collections.unmodifiableCollection(
+           new LinkedList<JobInProgress>(runningJobs.values())); 
       }
     }
+    
+    void addRunningJob(JobInProgress job) {
+      synchronized (runningJobs) {
+       runningJobs.put(new JobSchedulingInfo(job),job); 
+      }
+    }
+    
+    JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
+      synchronized (runningJobs) {
+        return runningJobs.remove(jobInfo); 
+      }
+    }
+    
+    JobInProgress removeJob(JobSchedulingInfo schedInfo) {
+      synchronized (jobList) {
+        return jobList.remove(schedInfo);
+      }
+    }
+    
+    void addJob(JobInProgress job) {
+      synchronized (jobList) {
+        jobList.put(new JobSchedulingInfo(job), job);
+      }
+    }
+    
   }
   
   // we maintain a hashmap of queue-names to queue info
@@ -109,14 +146,15 @@
    * Returns the queue of running jobs associated with the name
    */
   public Collection<JobInProgress> getRunningJobQueue(String queueName) {
-    return jobQueues.get(queueName).runningJobs.values();
+    return jobQueues.get(queueName).getRunningJobs();
   }
   
   /**
-   * Returns the queue of waiting jobs associated with the name
+   * Returns the queue of Uninitialised jobs associated with queue name.
+   * 
    */
-  public Collection<JobInProgress> getWaitingJobQueue(String queueName) {
-    return jobQueues.get(queueName).waitingJobs.values();
+  public Collection<JobInProgress> getJobs(String queueName) {
+    return jobQueues.get(queueName).getJobs();
   }
   
   @Override
@@ -133,19 +171,21 @@
     }
     // add job to waiting queue. It will end up in the right place, 
     // based on priority. 
-    qi.waitingJobs.put(new JobSchedulingInfo(job), job);
+    qi.addJob(job);
     // let scheduler know. 
     scheduler.jobAdded(job);
   }
 
+  /*
+   * The removal of the running jobs alone is done by the JobQueueManager.
+   * The removal of the jobs in the job queue is taken care by the
+   * JobInitializationPoller.
+   */
   private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
                             QueueInfo qi) {
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
-             + job.getProfile().getQueueName() + " has completed");
-    // job could be in running or waiting queue
-    if (qi.runningJobs.remove(oldInfo) != null) {
-      qi.waitingJobs.remove(oldInfo);
-    }
+        + job.getProfile().getQueueName() + " has completed");
+    qi.removeRunningJob(oldInfo);
     // let scheduler know
     scheduler.jobCompleted(job);
   }
@@ -159,25 +199,21 @@
   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
                            QueueInfo qi) {
     
-    JobSchedulingInfo newInfo = new JobSchedulingInfo(job);
-    if (qi.waitingJobs.remove(oldInfo) == null) {
-      qi.runningJobs.remove(oldInfo);
-      // Add back to the running queue
-      qi.runningJobs.put(newInfo, job);
-    } else {
-      // Add back to the waiting queue
-      qi.waitingJobs.put(newInfo, job);
+    if(qi.removeJob(oldInfo) != null) {
+      qi.addJob(job);
+    }
+    if(qi.removeRunningJob(oldInfo) != null) {
+      qi.addRunningJob(job);
     }
   }
   
   // This is used to move a job from the waiting queue to the running queue.
   private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
                               QueueInfo qi) {
-    // Remove from the waiting queue
-    qi.waitingJobs.remove(oldInfo);
-    
+    // Removing of the job from job list is responsibility of the
+    //initialization poller.
     // Add the job to the running queue
-    qi.runningJobs.put(new JobSchedulingInfo(job), job);
+    qi.addRunningJob(job);
   }
   
   // Update the scheduler as job's state has changed
@@ -221,4 +257,13 @@
     }
   }
   
+  public void removeJobFromQueue(JobInProgress job) {
+    String queue = job.getProfile().getQueueName();
+    QueueInfo qi = jobQueues.get(queue);
+    qi.removeJob(new JobSchedulingInfo(job));
+  }
+  
+  Comparator<JobSchedulingInfo> getComparator(String queue) {
+    return jobQueues.get(queue).comparator;
+  }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Nov 28 02:32:01 2008
@@ -23,7 +23,6 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,13 +33,109 @@
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-//import org.apache.hadoop.mapred.CapacityTaskScheduler;
 import org.apache.hadoop.conf.Configuration;
 
+
 public class TestCapacityScheduler extends TestCase {
   
   private static int jobCounter;
   
+  /**
+   * Test class that removes the asynchronous nature of job initialization.
+   * 
+   * The run method is a dummy which just waits for completion. It is
+   * expected that test code calls the main method, initializeJobs, directly
+   * to trigger initialization.
+   */
+  class ControlledJobInitializer extends 
+                              JobInitializationPoller.JobInitializationThread {
+    
+    boolean stopRunning;
+    
+    public ControlledJobInitializer(JobInitializationPoller p) {
+      p.super();
+    }
+    
+    @Override
+    public void run() {
+      while (!stopRunning) {
+        try {
+          synchronized(this) {
+            this.wait();  
+          }
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+    }
+    
+    void stopRunning() {
+      stopRunning = true;
+    }
+  }
+  
+  /**
+   * Test class that removes the asynchronous nature of job initialization.
+   * 
+   * The run method is a dummy which just waits for completion. It is
+   * expected that test code calls the main method, selectJobsToInitialize,
+   * directly to trigger initialization.
+   * 
+   * The class also creates the test worker thread objects of type 
+   * ControlledJobInitializer instead of the objects of the actual class
+   */
+  class ControlledInitializationPoller extends JobInitializationPoller {
+    
+    private boolean stopRunning;
+    private ArrayList<ControlledJobInitializer> workers;
+    
+    public ControlledInitializationPoller(JobQueuesManager mgr,
+                                          CapacitySchedulerConf rmConf,
+                                          Set<String> queues) {
+      super(mgr, rmConf, queues);
+    }
+    
+    @Override
+    public void run() {
+      // don't do anything here.
+      while (!stopRunning) {
+        try {
+          synchronized (this) {
+            this.wait();
+          }
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+    }
+    
+    @Override
+    JobInitializationThread createJobInitializationThread() {
+      ControlledJobInitializer t = new ControlledJobInitializer(this);
+      if (workers == null) {
+        workers = new ArrayList<ControlledJobInitializer>();
+      }
+      workers.add(t);
+      return t;
+    }
+
+    @Override
+    void selectJobsToInitialize() {
+      super.selectJobsToInitialize();
+      for (ControlledJobInitializer t : workers) {
+        t.initializeJobs();
+      }
+    }
+    
+    void stopRunning() {
+      stopRunning = true;
+      for (ControlledJobInitializer t : workers) {
+        t.stopRunning();
+        t.interrupt();
+      }
+    }
+  }
+  
   static class FakeJobInProgress extends JobInProgress {
     
     private FakeTaskTrackerManager taskTrackerManager;
@@ -208,12 +303,18 @@
       new HashMap<String, TaskStatus>();
 
     public FakeTaskTrackerManager() {
+      this(2, 1);
+    }
+    
+    public FakeTaskTrackerManager(int maxMapSlots, int maxReduceSlots) {
       trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
           new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+          maxMapSlots, maxReduceSlots));
+      maxMapTasksPerTracker = maxMapSlots;
       trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
           new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+          maxMapSlots, maxReduceSlots));
+      maxReduceTasksPerTracker = maxReduceSlots;
     }
     
     public void addTaskTracker(String ttName) {
@@ -405,6 +506,16 @@
     public boolean isPrioritySupported(String queue) {
       return queueMap.get(queue).supportsPrio;
     }
+    
+    @Override
+    public long getSleepInterval() {
+      return 1;
+    }
+    
+    @Override
+    public int getMaxWorkerThreads() {
+      return 1;
+    }
   }
 
   protected class FakeClock extends CapacityTaskScheduler.Clock {
@@ -522,8 +633,9 @@
     scheduler.jobQueuesManager.jobUpdated(event);
     
     // check if the jobs are missing from the waiting queue
-    assertEquals("Waiting queue is garbled on job init", 0, 
-                 scheduler.jobQueuesManager.getWaitingJobQueue("default")
+    // The jobs are not removed from waiting queue until they are scheduled 
+    assertEquals("Waiting queue is garbled on job init", 2, 
+                 scheduler.jobQueuesManager.getJobs("default")
                           .size());
     
     // test if changing the job priority/start-time works as expected in the 
@@ -550,7 +662,7 @@
     // check if the running queue size is correct
     assertEquals("Job finish garbles the queue", 
                  1, rqueue.size());
-    
+
   }
   
   // test if the queue reflects the changes
@@ -607,7 +719,7 @@
   private JobInProgress[] getJobsInQueue(boolean waiting) {
     Collection<JobInProgress> queue = 
       waiting 
-      ? scheduler.jobQueuesManager.getWaitingJobQueue("default")
+      ? scheduler.jobQueuesManager.getJobs("default")
       : scheduler.jobQueuesManager.getRunningJobQueue("default");
     return queue.toArray(new JobInProgress[0]);
   }
@@ -711,24 +823,23 @@
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
+    HashMap<String, ArrayList<FakeJobInProgress>> subJobsList = 
+      submitJobs(1, 4, "default");
+   
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
     
-    // submit a job
-    JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    while(mgr.getJobs("default").size() < 4){
+      Thread.sleep(1);
+    }
+    //Raise status change events for jobs submitted.
+    raiseStatusChangeEvents(mgr);
+    Collection<JobInProgress> jobs = scheduler.getJobs("default");
     
-    // submit another job
-    JobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+    assertTrue("Number of jobs returned by scheduler is wrong" 
+        ,jobs.size() == 4);
     
-    Collection<JobInProgress> jobs = scheduler.getJobs("default");
-    assertEquals(2, jobs.size());
-    Iterator<JobInProgress> iter = jobs.iterator();
-    assertEquals(j1, iter.next());
-    assertEquals(j2, iter.next());
-    
-    assertEquals(1, scheduler.jobQueuesManager.
-                        getRunningJobQueue("default").size());
-    assertEquals(1, scheduler.jobQueuesManager.
-                        getWaitingJobQueue("default").size());
+    assertTrue("Submitted jobs and Returned jobs are not same",
+        subJobsList.get("u1").containsAll(jobs));
   }
   
   //Basic test to test GC allocation across the queues which have no
@@ -970,13 +1081,16 @@
     // u1 finishes a task
     taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
     // u1 submits a few more jobs 
-    submitJob(JobStatus.PREP, 10, 10, null, "u1");
-    submitJob(JobStatus.PREP, 10, 10, null, "u1");
-    submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    // All the jobs are inited when submitted
+    // because of addition of Eager Job Initializer all jobs in this
+    //case would e initialised.
+    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // u2 also submits a job
-    submitJob(JobStatus.PREP, 10, 10, null, "u2");
+    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
     // now u3 submits a job
-    submitJob(JobStatus.PREP, 2, 2, null, "u3");
+    submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
     // next slot should go to u3, even though u2 has an earlier job, since
     // user limits have changed and u1/u2 are over limits
     checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
@@ -1013,7 +1127,7 @@
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     // now submit a job to q2
-    FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // update our structures
     scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
@@ -1070,7 +1184,7 @@
     // at this point, q3 is running 5 tasks (with a cap of 2), q4 is
     // running 3 tasks (with a cap of 1). 
     // If we submit a job to 'default', we need to get 3 slots back. 
-    FakeJobInProgress j4 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+    FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // update our structures
     scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
@@ -1184,4 +1298,287 @@
     return tasks.get(0);
   }
   
+  /*
+   * Test cases for Job Initialization poller.
+   */
+  
+  /*
+   * This test verifies that the correct number of jobs for
+   * correct number of users is initialized.
+   * It also verifies that as jobs of users complete, new jobs
+   * from the correct users are initialized.
+   */
+  public void testJobInitialization() throws Exception {
+    // set up the scheduler
+    String[] qs = { "default" };
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+                                        scheduler.jobQueuesManager,
+                                        resConf,
+                                        resConf.getQueues());
+    scheduler.setInitializationPoller(p);
+    scheduler.start();
+  
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+
+    // submit 4 jobs each for 3 users.
+    HashMap<String, ArrayList<FakeJobInProgress>> userJobs = submitJobs(3,
+        4, "default");
+
+    // get the jobs submitted.
+    ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
+    ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
+    ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
+    
+    // reference to the initializedJobs data structure
+    // changes are reflected in the set as they are made by the poller
+    HashSet<JobID> initializedJobs = initPoller.getInitializedJobList();
+    
+    // we should have 12 (3 x 4) jobs in the job queue
+    assertEquals(mgr.getJobs("default").size(), 12);
+
+    // run one poller iteration.
+    p.selectJobsToInitialize();
+    
+    // the poller should initialize 6 jobs
+    // 3 users and 2 jobs from each
+    assertEquals(initializedJobs.size(), 6);
+
+    assertTrue("Initialized jobs didnt contain the user1 job 1",
+        initializedJobs.contains(u1Jobs.get(0).getJobID()));
+    assertTrue("Initialized jobs didnt contain the user1 job 2",
+        initializedJobs.contains(u1Jobs.get(1).getJobID()));
+    assertTrue("Initialized jobs didnt contain the user2 job 1",
+        initializedJobs.contains(u2Jobs.get(0).getJobID()));
+    assertTrue("Initialized jobs didnt contain the user2 job 2",
+        initializedJobs.contains(u2Jobs.get(1).getJobID()));
+    assertTrue("Initialized jobs didnt contain the user3 job 1",
+        initializedJobs.contains(u3Jobs.get(0).getJobID()));
+    assertTrue("Initialized jobs didnt contain the user3 job 2",
+        initializedJobs.contains(u3Jobs.get(1).getJobID()));
+    
+    // now submit one more job from another user.
+    FakeJobInProgress u4j1 = 
+      submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+
+    // run the poller again.
+    p.selectJobsToInitialize();
+    
+    // since no jobs have started running, there should be no
+    // change to the initialized jobs.
+    assertEquals(initializedJobs.size(), 6);
+    assertFalse("Initialized jobs contains user 4 jobs",
+        initializedJobs.contains(u4j1.getJobID()));
+    
+    // This event simulates raising the event on completion of setup task
+    // and moves the job to the running list for the scheduler to pick up.
+    raiseStatusChangeEvents(mgr);
+    
+    // get some tasks assigned.
+    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    Task t3 = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    Task t4 = checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(0));
+    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(0));
+    taskTrackerManager.finishTask("tt2", t3.getTaskID().toString(), u1Jobs.get(1));
+    taskTrackerManager.finishTask("tt2", t4.getTaskID().toString(), u1Jobs.get(1));
+
+    // as some jobs have running tasks, the poller will now
+    // pick up new jobs to initialize.
+    p.selectJobsToInitialize();
+
+    // count should still be the same
+    assertEquals(initializedJobs.size(), 6);
+    
+    // new jobs that have got into the list
+    assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
+    assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
+    raiseStatusChangeEvents(mgr);
+    
+    // the first two jobs are done, no longer in the initialized list.
+    assertFalse("Initialized jobs contains the user1 job 1",
+        initializedJobs.contains(u1Jobs.get(0).getJobID()));
+    assertFalse("Initialized jobs contains the user1 job 2",
+        initializedJobs.contains(u1Jobs.get(1).getJobID()));
+    
+    // finish one more job
+    t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    t2 = checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(2));
+    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(2));
+
+    // no new jobs should be picked up, because max user limit
+    // is still 3.
+    p.selectJobsToInitialize();
+    
+    assertEquals(initializedJobs.size(), 5);
+    
+    // run 1 more jobs.. 
+    t1 = checkAssignment("tt1", "attempt_test_0004_m_000001_0 on tt1");
+    t1 = checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(3));
+    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(3));
+    
+    // Now initialised jobs should contain user 4's job, as
+    // user 1's jobs are all done and the number of users is
+    // below the limit
+    p.selectJobsToInitialize();
+    assertEquals(initializedJobs.size(), 5);
+    assertTrue(initializedJobs.contains(u4j1.getJobID()));
+    
+    p.stopRunning();
+  }
+
+  /*
+   * testHighPriorityJobInitialization() shows behaviour when high priority job
+   * is submitted into a queue and how initialisation happens for the same.
+   */
+  public void testHighPriorityJobInitialization() throws Exception {
+    String[] qs = { "default"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+                                            scheduler.jobQueuesManager,
+                                            resConf,
+                                            resConf.getQueues());
+    scheduler.setInitializationPoller(p);
+    scheduler.start();
+    JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+    HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+
+    // submit 3 jobs for 3 users
+    submitJobs(3,3,"default");
+    p.selectJobsToInitialize();
+    assertEquals(initializedJobsList.size(), 6);
+    
+    // submit 2 job for a different user. one of them will be made high priority
+    FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+    FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+    
+    p.selectJobsToInitialize();
+    
+    // shouldn't change
+    assertEquals(initializedJobsList.size(), 6);
+    
+    assertFalse("Contains U4J1 high priority job " , 
+        initializedJobsList.contains(u4j1.getJobID()));
+    assertFalse("Contains U4J2 Normal priority job " , 
+        initializedJobsList.contains(u4j2.getJobID()));
+
+    // change priority of one job
+    taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
+    
+    p.selectJobsToInitialize();
+    
+    // the high priority job should get initialized, but not the
+    // low priority job from u4, as we have already exceeded the
+    // limit.
+    assertEquals(initializedJobsList.size(), 7);
+    assertTrue("Does not contain U4J1 high priority job " , 
+        initializedJobsList.contains(u4j1.getJobID()));
+    assertFalse("Contains U4J2 Normal priority job " , 
+        initializedJobsList.contains(u4j2.getJobID()));
+    p.stopRunning();
+  }
+  
+  public void testJobMovement() throws Exception {
+    String[] qs = { "default"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+        scheduler.jobQueuesManager,
+        resConf,
+        resConf.getQueues());
+    scheduler.setInitializationPoller(p);
+    scheduler.start();
+    
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+    HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+    
+    // submit a job
+    FakeJobInProgress job = 
+      submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+    p.selectJobsToInitialize();
+    
+    assertEquals(initializedJobsList.size(), 1);
+
+    // make it running.
+    raiseStatusChangeEvents(mgr);
+    
+    // it should be there in both the queues.
+    assertTrue("Job not present in Job Queue",
+        mgr.getJobs("default").contains(job));
+    assertTrue("Job not present in Running Queue",
+        mgr.getRunningJobQueue("default").contains(job));
+    
+    // assign a task
+    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    
+    p.selectJobsToInitialize();
+    
+    // now this task should be removed from the initialized list.
+    assertTrue(initializedJobsList.isEmpty());
+
+    // the job should also be removed from the job queue as tasks
+    // are scheduled
+    assertFalse("Job present in Job Queue",
+        mgr.getJobs("default").contains(job));
+    
+    // complete tasks and job
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job);
+    taskTrackerManager.finalizeJob(job);
+    
+    // make sure it is removed from the run queue
+    assertFalse("Job present in running queue",
+        mgr.getRunningJobQueue("default").contains(job));
+  }
+  
+  private void raiseStatusChangeEvents(JobQueuesManager mgr) {
+    Collection<JobInProgress> jips = mgr.getJobs("default");
+    for(JobInProgress jip : jips) {
+      if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
+        JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
+            EventType.RUN_STATE_CHANGED,jip.getStatus());
+        mgr.jobUpdated(evt);
+      }
+    }
+  }
+
+  private HashMap<String, ArrayList<FakeJobInProgress>> submitJobs(
+      int numberOfUsers, int numberOfJobsPerUser, String queue)
+      throws Exception{
+    HashMap<String, ArrayList<FakeJobInProgress>> userJobs = 
+      new HashMap<String, ArrayList<FakeJobInProgress>>();
+    for (int i = 1; i <= numberOfUsers; i++) {
+      String user = String.valueOf("u" + i);
+      ArrayList<FakeJobInProgress> jips = new ArrayList<FakeJobInProgress>();
+      for (int j = 1; j <= numberOfJobsPerUser; j++) {
+        jips.add(submitJob(JobStatus.PREP, 1, 1, queue, user));
+      }
+      userJobs.put(user, jips);
+    }
+    return userJobs;
+
+  }
+  
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=721415&r1=721414&r2=721415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Fri Nov 28 02:32:01 2008
@@ -52,11 +52,13 @@
         new String[] { "guaranteed-capacity", 
                        "reclaim-time-limit",
                        "supports-priority",
-                       "minimum-user-limit-percent" }, 
+                       "minimum-user-limit-percent",
+                       "maximum-initialized-jobs-per-user"}, 
         new String[] { "100", 
                         "300",
                         "false", 
-                        "100" }
+                        "100",
+                        "2" }
                       );
   }
 
@@ -86,22 +88,26 @@
         new String[] { "guaranteed-capacity", 
                        "reclaim-time-limit",
                        "supports-priority",
-                       "minimum-user-limit-percent" }, 
+                       "minimum-user-limit-percent",
+                       "maximum-initialized-jobs-per-user"}, 
         new String[] { "10", 
                         "600",
                         "true",
-                        "25" }
+                        "25",
+                        "4"}
                       );
 
     Map<String, String> q2Props = setupQueueProperties(
         new String[] { "guaranteed-capacity", 
                        "reclaim-time-limit",
                        "supports-priority",
-                       "minimum-user-limit-percent" }, 
+                       "minimum-user-limit-percent",
+                       "maximum-initialized-jobs-per-user"}, 
         new String[] { "100", 
                         "6000",
                         "false", 
-                        "50" }
+                        "50",
+                        "1"}
                       );
 
     startConfig();
@@ -139,6 +145,7 @@
     }
     expProperties.put("reclaim-time-limit", "300");
     expProperties.put("supports-priority", "false");
+    expProperties.put("maximum-initialized-jobs-per-user", "2");
     queueDetails.put("default", expProperties);
     checkQueueProperties(testConf, queueDetails);
   }