You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/05 06:13:11 UTC
svn commit: r723586 - in /hadoop/core/trunk: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Author: yhemanth
Date: Thu Dec 4 21:13:10 2008
New Revision: 723586
URL: http://svn.apache.org/viewvc?rev=723586&view=rev
Log:
HADOOP-4576. Show pending job count instead of task count in the UI per queue in capacity scheduler. Contributed by Sreekanth Ramakrishnan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 4 21:13:10 2008
@@ -271,6 +271,9 @@
commit hash for the version and not the message, which requires escaping.
(cdouglas)
+ HADOOP-4576. Show pending job count instead of task count in the UI per
+ queue in capacity scheduler. (Sreekanth Ramakrishnan via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Dec 4 21:13:10 2008
@@ -196,12 +196,18 @@
private QueueSchedulingInfo mqsi;
private QueueSchedulingInfo rqsi;
private boolean supportsPriority;
+ private JobQueuesManager mgr;
+ private long pollingInterval;
- public SchedulingInfo(QueueSchedulingInfo mqsi,
- QueueSchedulingInfo rqsi, boolean supportsPriority ) {
+
+ SchedulingInfo(QueueSchedulingInfo mqsi,
+ QueueSchedulingInfo rqsi, boolean supportsPriority,
+ JobQueuesManager mgr, long pollingInterval) {
this.mqsi = mqsi;
this.rqsi = rqsi;
this.supportsPriority=supportsPriority;
+ this.mgr = mgr;
+ this.pollingInterval = pollingInterval;
}
@Override
@@ -220,12 +226,12 @@
mqsi.numRunningTasks));
sb.append(String.format("Number of Running Reduces : %d \n",
rqsi.numRunningTasks));
- sb.append(String.format("Number of Waiting Maps : %d \n",
- mqsi.numPendingTasks));
- sb.append(String.format("Number of Waiting Reduces : %d \n",
- rqsi.numPendingTasks));
+ sb.append(String.format("Number of Waiting Jobs : %d \n", mgr
+ .getWaitingJobCount(mqsi.queueName)));
sb.append(String.format("Priority Supported : %s \n",
supportsPriority?"YES":"NO"));
+ sb.append(String.format("* Scheduling information can be off by "
+ + "maximum of %s\n", StringUtils.formatTime(pollingInterval)));
return sb.toString();
}
}
@@ -1154,7 +1160,8 @@
SchedulingInfo schedulingInfo = new SchedulingInfo(
mapScheduler.getQueueSchedulingInfo(queueName),
- reduceScheduler.getQueueSchedulingInfo(queueName),supportsPrio);
+ reduceScheduler.getQueueSchedulingInfo(queueName),supportsPrio,
+ jobQueuesManager,rmConf.getSleepInterval());
queueManager.setSchedulerInfo(queueName, schedulingInfo);
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Dec 4 21:13:10 2008
@@ -121,6 +121,12 @@
}
}
+ int getWaitingJobCount() {
+ synchronized (jobList) {
+ return jobList.size();
+ }
+ }
+
}
// we maintain a hashmap of queue-names to queue info
@@ -258,7 +264,7 @@
}
}
- public void removeJobFromQueue(JobInProgress job) {
+ void removeJobFromQueue(JobInProgress job) {
String queue = job.getProfile().getQueueName();
QueueInfo qi = jobQueues.get(queue);
qi.removeJob(new JobSchedulingInfo(job));
@@ -267,4 +273,9 @@
Comparator<JobSchedulingInfo> getComparator(String queue) {
return jobQueues.get(queue).comparator;
}
+
+ int getWaitingJobCount(String queue) {
+ QueueInfo qi = jobQueues.get(queue);
+ return qi.getWaitingJobCount();
+ }
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=723586&r1=723585&r2=723586&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Dec 4 21:13:10 2008
@@ -36,6 +36,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -241,6 +242,11 @@
Set<TaskInProgress> getRunningReduces() {
return (Set<TaskInProgress>)reduceTips;
}
+
+ @Override
+ synchronized void fail() {
+ getStatus().setRunState(JobStatus.FAILED);
+ }
}
static class FakeTaskInProgress extends TaskInProgress {
@@ -1338,8 +1344,10 @@
}
- public void testSchedulingInformation() throws IOException {
+ public void testSchedulingInformation() throws Exception {
String[] qs = {"default", "q2"};
+ taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1347,6 +1355,11 @@
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues());
+ scheduler.setInitializationPoller(p);
scheduler.start();
scheduler.assignTasks(tracker("tt1")); // heartbeat
scheduler.assignTasks(tracker("tt2")); // heartbeat
@@ -1356,6 +1369,7 @@
String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
+
assertEquals(infoStrings.length, 10);
assertEquals(infoStrings[0] , "Guaranteed Capacity (%) : 50.0 ");
assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
@@ -1364,10 +1378,97 @@
assertEquals(infoStrings[4] , "Reclaim Time limit : 1000000 " );
assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
- assertEquals(infoStrings[7] , "Number of Waiting Maps : 0 ");
- assertEquals(infoStrings[8] , "Number of Waiting Reduces : 0 ");
- assertEquals(infoStrings[9] , "Priority Supported : YES ");
- assertEquals(schedulingInfo, schedulingInfo2);
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 0 ");
+ assertEquals(infoStrings[8] , "Priority Supported : YES ");
+ assertEquals(infoStrings[9] , "* Scheduling information can be off by " +
+ "maximum of "+ StringUtils.formatTime(resConf.getSleepInterval()));
+ assertEquals(schedulingInfo, schedulingInfo2);
+
+ /*
+ * Following is the testing strategy for testing scheduling information.
+ * - Submit 5 jobs to a queue.
+ * - Check the waiting jobs count, it should be 5.
+ * - Then run initializationPoller()
+ * - Check once again the waiting queue, it should be 5 jobs again.
+ * - Then raise status change events.
+ * - Assign one task to a task tracker.
+ * - Check waiting job count, it should be 4 now.
+ * - Then pick an initialized job but not scheduled job and fail it.
+ * - Run the poller
+ * - Check the waiting job count should now be 3.
+ * - Now fail a job which has not been initialized at all.
+ * - Run the poller, so that it can clean up the job queue.
+ * - Check the count, the waiting job count should be 2.
+ */
+ //Testing with actual job submission.
+ ArrayList<FakeJobInProgress> userJobs =
+ submitJobs(1, 5, "default").get("u1");
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+
+ //waiting job should be equal to number of jobs submitted.
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
+ assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+
+ //Initalize the jobs but don't raise events
+ p.selectJobsToInitialize();
+
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ //should be previous value as nothing is scheduled because no events
+ //has been raised after initialization.
+ assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
+ assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+
+ //Raise status change event so that jobs can move to running queue.
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
+ //assign one job
+ scheduler.assignTasks(tracker("tt1")); // heartbeat
+ //Initalize extra job.
+ p.selectJobsToInitialize();
+
+ //Get scheduling information, now the number of waiting job should have
+ //changed to 4 as one is scheduled and has become running.
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ //TODO check running task count also fix in HADOOP-4445
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 4 ");
+
+
+ //Fail a job which is initialized but not scheduled and check the count.
+ FakeJobInProgress u1j2 = userJobs.get(1);
+ assertTrue("User1 job 2 not initalized ",
+ u1j2.getStatus().getRunState() == JobStatus.RUNNING);
+ u1j2.fail();
+ //Run initializer to clean up failed jobs
+ p.selectJobsToInitialize();
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 3 ");
+
+ //Fail a job which is not initialized but is in the waiting queue.
+ FakeJobInProgress u1j5 = userJobs.get(4);
+ assertFalse("User1 job 5 initalized ",
+ u1j5.getStatus().getRunState() == JobStatus.RUNNING);
+ u1j5.fail();
+ //run initializer to clean up failed job
+ p.selectJobsToInitialize();
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 2 ");
+
}
/**