You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/05 06:13:11 UTC

svn commit: r723586 - in /hadoop/core/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/

Author: yhemanth
Date: Thu Dec  4 21:13:10 2008
New Revision: 723586

URL: http://svn.apache.org/viewvc?rev=723586&view=rev
Log:
HADOOP-4576. Show pending job count instead of task count in the UI per queue in capacity scheduler. Contributed by Sreekanth Ramakrishnan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec  4 21:13:10 2008
@@ -271,6 +271,9 @@
     commit hash for the version and not the message, which requires escaping.
     (cdouglas)
 
+    HADOOP-4576. Show pending job count instead of task count in the UI per
+    queue in capacity scheduler. (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Dec  4 21:13:10 2008
@@ -196,12 +196,18 @@
     private QueueSchedulingInfo mqsi;
     private QueueSchedulingInfo rqsi;
     private boolean supportsPriority;
+    private JobQueuesManager mgr;
+    private long pollingInterval;
     
-    public SchedulingInfo(QueueSchedulingInfo mqsi, 
-        QueueSchedulingInfo rqsi, boolean supportsPriority ) {
+    
+    SchedulingInfo(QueueSchedulingInfo mqsi, 
+        QueueSchedulingInfo rqsi, boolean supportsPriority,
+        JobQueuesManager mgr, long pollingInterval) {
       this.mqsi = mqsi;
       this.rqsi = rqsi;
       this.supportsPriority=supportsPriority;
+      this.mgr = mgr;
+      this.pollingInterval = pollingInterval;
     }
     
     @Override
@@ -220,12 +226,12 @@
           mqsi.numRunningTasks));
       sb.append(String.format("Number of Running Reduces : %d \n", 
           rqsi.numRunningTasks));
-      sb.append(String.format("Number of Waiting Maps : %d \n", 
-          mqsi.numPendingTasks));
-      sb.append(String.format("Number of Waiting Reduces : %d \n", 
-          rqsi.numPendingTasks));
+      sb.append(String.format("Number of Waiting Jobs : %d \n", mgr
+          .getWaitingJobCount(mqsi.queueName)));
       sb.append(String.format("Priority Supported : %s \n",
           supportsPriority?"YES":"NO"));      
+      sb.append(String.format("* Scheduling information can be off by "
+          + "maximum of %s\n", StringUtils.formatTime(pollingInterval)));
       return sb.toString();
     }
   }
@@ -1154,7 +1160,8 @@
       
       SchedulingInfo schedulingInfo = new SchedulingInfo(
           mapScheduler.getQueueSchedulingInfo(queueName),
-          reduceScheduler.getQueueSchedulingInfo(queueName),supportsPrio);
+          reduceScheduler.getQueueSchedulingInfo(queueName),supportsPrio,
+          jobQueuesManager,rmConf.getSleepInterval());
       queueManager.setSchedulerInfo(queueName, schedulingInfo);
       
     }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Dec  4 21:13:10 2008
@@ -121,6 +121,12 @@
       }
     }
     
+    int getWaitingJobCount() {
+      synchronized (jobList) {
+       return jobList.size(); 
+      }
+    }
+    
   }
   
   // we maintain a hashmap of queue-names to queue info
@@ -258,7 +264,7 @@
     }
   }
   
-  public void removeJobFromQueue(JobInProgress job) {
+  void removeJobFromQueue(JobInProgress job) {
     String queue = job.getProfile().getQueueName();
     QueueInfo qi = jobQueues.get(queue);
     qi.removeJob(new JobSchedulingInfo(job));
@@ -267,4 +273,9 @@
   Comparator<JobSchedulingInfo> getComparator(String queue) {
     return jobQueues.get(queue).comparator;
   }
+  
+  int getWaitingJobCount(String queue) {
+    QueueInfo qi = jobQueues.get(queue);
+    return qi.getWaitingJobCount();
+  }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Dec  4 21:13:10 2008
@@ -36,6 +36,7 @@
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -241,6 +242,11 @@
     Set<TaskInProgress> getRunningReduces() {
       return (Set<TaskInProgress>)reduceTips;
     }
+    
+    @Override
+    synchronized void fail() {
+      getStatus().setRunState(JobStatus.FAILED);
+    }
   }
   
   static class FakeTaskInProgress extends TaskInProgress {
@@ -1338,8 +1344,10 @@
     
   }
   
-  public void testSchedulingInformation() throws IOException {
+  public void testSchedulingInformation() throws Exception {
     String[] qs = {"default", "q2"};
+    taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1347,6 +1355,11 @@
     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+        scheduler.jobQueuesManager,
+        resConf,
+        resConf.getQueues());
+    scheduler.setInitializationPoller(p);
     scheduler.start();
     scheduler.assignTasks(tracker("tt1")); // heartbeat
     scheduler.assignTasks(tracker("tt2")); // heartbeat
@@ -1356,6 +1369,7 @@
     String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
     String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
+    
     assertEquals(infoStrings.length, 10);
     assertEquals(infoStrings[0] , "Guaranteed Capacity (%) : 50.0 ");
     assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
@@ -1364,10 +1378,97 @@
     assertEquals(infoStrings[4] , "Reclaim Time limit : 1000000 " );
     assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
     assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
-    assertEquals(infoStrings[7] , "Number of Waiting Maps : 0 ");
-    assertEquals(infoStrings[8] , "Number of Waiting Reduces : 0 ");
-    assertEquals(infoStrings[9] , "Priority Supported : YES ");
-    assertEquals(schedulingInfo, schedulingInfo2);   
+    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 0 ");
+    assertEquals(infoStrings[8] , "Priority Supported : YES ");
+    assertEquals(infoStrings[9] , "* Scheduling information can be off by " +
+        "maximum of "+ StringUtils.formatTime(resConf.getSleepInterval()));
+    assertEquals(schedulingInfo, schedulingInfo2);
+    
+    /*
+     * Following is the testing strategy for testing scheduling information.
+     * - Submit 5 jobs to a queue.
+     * - Check the waiting jobs count, it should be 5.
+     * - Then run initializationPoller()
+     * - Check once again the waiting queue, it should be 5 jobs again.
+     * - Then raise status change events.
+     * - Assign one task to a task tracker.
+     * - Check waiting job count, it should be 4 now.
+     * - Then pick an initialized job but not scheduled job and fail it.
+     * - Run the poller
+     * - Check the waiting job count should now be 3.
+     * - Now fail a job which has not been initialized at all.
+     * - Run the poller, so that it can clean up the job queue.
+     * - Check the count, the waiting job count should be 2.
+     */
+    //Testing with actual job submission.
+    ArrayList<FakeJobInProgress> userJobs = 
+      submitJobs(1, 5, "default").get("u1");
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    
+    //waiting job should be equal to number of jobs submitted.
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
+    assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
+    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+    
+    //Initalize the jobs but don't raise events
+    p.selectJobsToInitialize();
+    
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
+    assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
+    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+    
+    //Raise status change event so that jobs can move to running queue.
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+    //assign one job
+    scheduler.assignTasks(tracker("tt1")); // heartbeat
+    //Initalize extra job.
+    p.selectJobsToInitialize();
+    
+    //Get scheduling information, now the number of waiting job should have
+    //changed to 4 as one is scheduled and has become running.
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    //TODO check running task count also fix in HADOOP-4445
+    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 4 ");
+    
+    
+    //Fail a job which is initialized but not scheduled and check the count.
+    FakeJobInProgress u1j2 = userJobs.get(1);
+    assertTrue("User1 job 2 not initalized ", 
+        u1j2.getStatus().getRunState() == JobStatus.RUNNING);
+    u1j2.fail();
+    //Run initializer to clean up failed jobs
+    p.selectJobsToInitialize();
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 3 ");
+    
+    //Fail a job which is not initialized but is in the waiting queue.
+    FakeJobInProgress u1j5 = userJobs.get(4);
+    assertFalse("User1 job 5 initalized ", 
+        u1j5.getStatus().getRunState() == JobStatus.RUNNING);
+    u1j5.fail();
+    //run initializer to clean up failed job
+    p.selectJobsToInitialize();
+    schedulingInfo = 
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    infoStrings = schedulingInfo.split("\n");
+    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings[7] , "Number of Waiting Jobs : 2 ");
+    
   }
 
   /**