You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/10 07:01:29 UTC

svn commit: r703320 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: ddas
Date: Thu Oct  9 22:01:28 2008
New Revision: 703320

URL: http://svn.apache.org/viewvc?rev=703320&view=rev
Log:
HADOOP-4288. Fixes a NPE problem in CapacityScheduler. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=703320&r1=703319&r2=703320&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Oct  9 22:01:28 2008
@@ -876,6 +876,9 @@
     HADOOP-4018. The number of tasks for a single job cannot exceed a 
     pre-configured maximum value. (dhruba)
 
+    HADOOP-4288. Fixes a NPE problem in CapacityScheduler. 
+   (Amar Kamat via ddas)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=703320&r1=703319&r2=703320&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Oct  9 22:01:28 2008
@@ -543,12 +543,15 @@
       // update user-specific info
       Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
       if (null == i) {
-        qsi.numJobsByUser.put(job.getProfile().getUser(), 1);
+        i = 1;
         qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
       }
       else {
         i++;
       }
+      qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+      LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
+                + job.getProfile().getUser() + ", user now has " + i + " jobs");
     }
     void jobRemoved(JobInProgress job) {
       // update qsi 
@@ -566,7 +569,9 @@
         LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
       }
       else {
-        LOG.debug("User still has jobs, number of users = " + qsi.numJobsByUser.size());
+        qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+        LOG.debug("User still has " + i + " jobs, number of users = "
+                  + qsi.numJobsByUser.size());
       }
     }
 

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=703320&r1=703319&r2=703320&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Oct  9 22:01:28 2008
@@ -78,7 +78,7 @@
     @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
-      if (runningMapTasks == numMapTasks) return null;
+      if (mapTaskCtr == numMapTasks) return null;
       TaskAttemptID attemptId = getTaskAttemptID(true);
       Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
         @Override
@@ -97,7 +97,7 @@
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
-      if (runningReduceTasks == numReduceTasks) return null;
+      if (redTaskCtr == numReduceTasks) return null;
       TaskAttemptID attemptId = getTaskAttemptID(false);
       Task task = new ReduceTask("", attemptId, 0, 10) {
         @Override
@@ -425,6 +425,69 @@
     }
   }*/
   
+  // tests if tasks can be assinged when there are multiple jobs from a same
+  // user
+  public void testJobFinished() throws Exception {
+    taskTrackerManager.addQueues(new String[] {"default"});
+    
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit 2 jobs
+    FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
+    FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
+    
+    // init them and inform the scheduler
+    j1.initTasks();
+    scheduler.jobQueuesManager.jobUpdated(j1);
+    j2.initTasks();
+    scheduler.jobQueuesManager.jobUpdated(j2);
+    
+    // I. Check multiple assignments with running tasks within job
+    // ask for a task from first job
+    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    //  ask for another task from the first job
+    t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    
+    // complete tasks
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+    
+    // II. Check multiple assignments with running tasks across jobs
+    // ask for a task from first job
+    t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+    
+    //  ask for a task from the second job
+    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    
+    // complete tasks
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
+    
+    // III. Check multiple assignments with completed tasks across jobs
+    // ask for a task from the second job
+    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    
+    // complete task
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
+    
+    // IV. Check assignment with completed job
+    // finish first job
+    j1.getStatus().setRunState(JobStatus.SUCCEEDED);
+    scheduler.jobRemoved(j1);
+    
+    // ask for another task from the second job
+    // if tasks can be assigned then the structures are properly updated 
+    t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+    
+    // complete task
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
+  }
+  
   // basic tests, should be able to submit to queues
   public void testSubmitToQueues() throws Exception {
     // set up some queues