You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/10 07:01:29 UTC
svn commit: r703320 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: ddas
Date: Thu Oct 9 22:01:28 2008
New Revision: 703320
URL: http://svn.apache.org/viewvc?rev=703320&view=rev
Log:
HADOOP-4288. Fixes a NPE problem in CapacityScheduler. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=703320&r1=703319&r2=703320&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Oct 9 22:01:28 2008
@@ -876,6 +876,9 @@
HADOOP-4018. The number of tasks for a single job cannot exceed a
pre-configured maximum value. (dhruba)
+ HADOOP-4288. Fixes a NPE problem in CapacityScheduler.
+ (Amar Kamat via ddas)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=703320&r1=703319&r2=703320&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Oct 9 22:01:28 2008
@@ -543,12 +543,15 @@
// update user-specific info
Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
if (null == i) {
- qsi.numJobsByUser.put(job.getProfile().getUser(), 1);
+ i = 1;
qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
}
else {
i++;
}
+ qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+ LOG.debug("Job " + job.getJobID().toString() + " is added under user "
+ + job.getProfile().getUser() + ", user now has " + i + " jobs");
}
void jobRemoved(JobInProgress job) {
// update qsi
@@ -566,7 +569,9 @@
LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
}
else {
- LOG.debug("User still has jobs, number of users = " + qsi.numJobsByUser.size());
+ qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+ LOG.debug("User still has " + i + " jobs, number of users = "
+ + qsi.numJobsByUser.size());
}
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=703320&r1=703319&r2=703320&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Oct 9 22:01:28 2008
@@ -78,7 +78,7 @@
@Override
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
- if (runningMapTasks == numMapTasks) return null;
+ if (mapTaskCtr == numMapTasks) return null;
TaskAttemptID attemptId = getTaskAttemptID(true);
Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
@Override
@@ -97,7 +97,7 @@
@Override
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
- if (runningReduceTasks == numReduceTasks) return null;
+ if (redTaskCtr == numReduceTasks) return null;
TaskAttemptID attemptId = getTaskAttemptID(false);
Task task = new ReduceTask("", attemptId, 0, 10) {
@Override
@@ -425,6 +425,69 @@
}
}*/
+ // tests if tasks can be assinged when there are multiple jobs from a same
+ // user
+ public void testJobFinished() throws Exception {
+ taskTrackerManager.addQueues(new String[] {"default"});
+
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit 2 jobs
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
+
+ // init them and inform the scheduler
+ j1.initTasks();
+ scheduler.jobQueuesManager.jobUpdated(j1);
+ j2.initTasks();
+ scheduler.jobQueuesManager.jobUpdated(j2);
+
+ // I. Check multiple assignments with running tasks within job
+ // ask for a task from first job
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // ask for another task from the first job
+ t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+
+ // complete tasks
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+
+ // II. Check multiple assignments with running tasks across jobs
+ // ask for a task from first job
+ t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+
+ // ask for a task from the second job
+ t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+ // complete tasks
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
+
+ // III. Check multiple assignments with completed tasks across jobs
+ // ask for a task from the second job
+ t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+
+ // complete task
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
+
+ // IV. Check assignment with completed job
+ // finish first job
+ j1.getStatus().setRunState(JobStatus.SUCCEEDED);
+ scheduler.jobRemoved(j1);
+
+ // ask for another task from the second job
+ // if tasks can be assigned then the structures are properly updated
+ t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+
+ // complete task
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
+ }
+
// basic tests, should be able to submit to queues
public void testSubmitToQueues() throws Exception {
// set up some queues