You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/12 15:35:12 UTC

svn commit: r773889 - in /hadoop/core/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/

Author: yhemanth
Date: Tue May 12 13:35:11 2009
New Revision: 773889

URL: http://svn.apache.org/viewvc?rev=773889&view=rev
Log:
HADOOP-5641. Fix a NullPointerException in capacity scheduler's memory  based scheduling code when jobs get retired. Contributed by Hemanth Yamijala.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 12 13:35:11 2009
@@ -617,6 +617,9 @@
     KILLED (this used to happen when the SetupTask would come back with a 
     success after the job has been killed). (Amar Kamat via ddas)
 
+    HADOOP-5641. Fix a NullPointerException in capacity scheduler's memory
+    based scheduling code when jobs get retired. (yhemanth)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 12 13:35:11 2009
@@ -413,7 +413,8 @@
             }
           }
           else {
-            // mem requirements not met. Rather than look at the next job, 
+            // mem requirements not met or could not be computed for this TT
+            // Rather than look at the next job, 
             // we return nothing to the TT, with the hope that we improve 
             // chances of finding a suitable TT for this job. This lets us
             // avoid starving jobs with high mem requirements.         

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Tue May 12 13:35:11 2009
@@ -112,7 +112,8 @@
    * residing on the given TaskTracker.
    * 
    * @param taskTracker
-   * @return amount of memory that is used by the residing tasks
+   * @return amount of memory that is used by the residing tasks,
+   *          null if memory cannot be computed for some reason.
    */
   private synchronized Memory getMemReservedForTasks(
       TaskTrackerStatus taskTracker) {
@@ -141,6 +142,26 @@
       // accounted in used memory.
       if ((task.getRunState() == TaskStatus.State.RUNNING)
           || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
+        JobInProgress job = scheduler.taskTrackerManager.getJob(
+                                              task.getTaskID().getJobID());
+        if (job == null) {
+          // This scenario can happen if a job was completed/killed
+          // and retired from JT's memory. In this state, we can ignore 
+          // the running task status and compute memory for the rest of 
+          // the tasks. However, any scheduling done with this computation
+          // could result in over-subscribing of memory for tasks on this
+          // TT (as the unaccounted for task is still running).
+          // So, it is safer to not schedule anything for this TT
+          // One of the ways of doing that is to return null from here
+          // and check for null in the calling method.
+          LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
+                    + "a running / commit pending task: " + task.getTaskID()
+                    + " but no corresponding job was found. "
+                    + "Maybe job was retired. Not computing "
+                    + "memory values for this TT.");
+          return null;
+        }
+        
         JobConf jConf =
             scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
                 .getJobConf();
@@ -194,6 +215,16 @@
     }
 
     Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
+    if (memReservedForTasks == null) {
+      // For some reason, maybe because we could not find the job
+      // corresponding to a running task (as can happen if the job
+      // is retired in between), we could not compute the memory state
+      // on this TT. Treat this as an error, and fail memory
+      // requirements.
+      LOG.info("Could not compute memory for taskTracker: " 
+                + taskTracker.getHost() + ". Failing memory requirements.");
+      return false;
+    }
     long vmemUsedOnTT = memReservedForTasks.vmem;
     long pmemUsedOnTT = memReservedForTasks.pmem;
 
@@ -243,4 +274,4 @@
         + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
     return true;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 12 13:35:11 2009
@@ -385,6 +385,10 @@
       job.kill();
     }
 
+    public void removeJob(JobID jobid) {
+      jobs.remove(jobid);
+    }
+    
     @Override
     public JobInProgress getJob(JobID jobid) {
       return jobs.get(jobid);
@@ -1791,6 +1795,84 @@
     assertNull(scheduler.assignTasks(tracker("tt1")));
   }
 
+  /**
+   * Testcase to verify fix for a NPE (HADOOP-5641), when memory based
+   * scheduling is enabled and jobs are retired from memory when tasks
+   * are still active on some Tasktrackers.
+   *  
+   * @throws IOException
+   */
+  public void testMemoryMatchingWithRetiredJobs() throws IOException {
+    // create a cluster with a single node.
+    LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
+    ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    // create scheduler
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    LOG.debug("By default, jobs get 0.5 GB per task vmem" +
+        " and 2 GB max vmem, with 50% of it for RAM");
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        512 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        2 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(50.0f);
+    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // submit a normal job
+    LOG.debug("Submitting a normal job with 2 maps and 2 reduces");
+    JobConf jConf = new JobConf();
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // 1st cycle - 1 map gets assigned.
+    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    
+    // kill this job !
+    taskTrackerManager.killJob(job1.getJobID());
+    
+    // retire the job
+    taskTrackerManager.removeJob(job1.getJobID());
+    
+    // submit another job.
+    LOG.debug("Submitting another normal job with 1 map and 1 reduce");
+    jConf = new JobConf();
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+    
+    // 2nd cycle - nothing should get assigned. Memory matching code
+    // will see the job is missing and fail memory requirements.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    // calling again should not make a difference, as the task is still running
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    
+    // finish the task on the tracker.
+    taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+    // now a new task can be assigned.
+    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // reduce can be assigned.
+    t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+  }
+  
   protected TaskTrackerStatus tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }