You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/12 15:35:12 UTC
svn commit: r773889 - in /hadoop/core/trunk: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Author: yhemanth
Date: Tue May 12 13:35:11 2009
New Revision: 773889
URL: http://svn.apache.org/viewvc?rev=773889&view=rev
Log:
HADOOP-5641. Fix a NullPointerException in capacity scheduler's memory based scheduling code when jobs get retired. Contributed by Hemanth Yamijala.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 12 13:35:11 2009
@@ -617,6 +617,9 @@
KILLED (this used to happen when the SetupTask would come back with a
success after the job has been killed). (Amar Kamat via ddas)
+ HADOOP-5641. Fix a NullPointerException in capacity scheduler's memory
+ based scheduling code when jobs get retired. (yhemanth)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 12 13:35:11 2009
@@ -413,7 +413,8 @@
}
}
else {
- // mem requirements not met. Rather than look at the next job,
+ // mem requirements not met or could not be computed for this TT
+ // Rather than look at the next job,
// we return nothing to the TT, with the hope that we improve
// chances of finding a suitable TT for this job. This lets us
// avoid starving jobs with high mem requirements.
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Tue May 12 13:35:11 2009
@@ -112,7 +112,8 @@
* residing on the given TaskTracker.
*
* @param taskTracker
- * @return amount of memory that is used by the residing tasks
+ * @return amount of memory that is used by the residing tasks,
+ * null if memory cannot be computed for some reason.
*/
private synchronized Memory getMemReservedForTasks(
TaskTrackerStatus taskTracker) {
@@ -141,6 +142,26 @@
// accounted in used memory.
if ((task.getRunState() == TaskStatus.State.RUNNING)
|| (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
+ JobInProgress job = scheduler.taskTrackerManager.getJob(
+ task.getTaskID().getJobID());
+ if (job == null) {
+ // This scenario can happen if a job was completed/killed
+ // and retired from JT's memory. In this state, we can ignore
+ // the running task status and compute memory for the rest of
+ // the tasks. However, any scheduling done with this computation
+ // could result in over-subscribing of memory for tasks on this
+ // TT (as the unaccounted for task is still running).
+ // So, it is safer to not schedule anything for this TT
+ // One of the ways of doing that is to return null from here
+ // and check for null in the calling method.
+ LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
+ + "a running / commit pending task: " + task.getTaskID()
+ + " but no corresponding job was found. "
+ + "Maybe job was retired. Not computing "
+ + "memory values for this TT.");
+ return null;
+ }
+
JobConf jConf =
scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
.getJobConf();
@@ -194,6 +215,16 @@
}
Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
+ if (memReservedForTasks == null) {
+ // For some reason, maybe because we could not find the job
+ // corresponding to a running task (as can happen if the job
+ // is retired in between), we could not compute the memory state
+ // on this TT. Treat this as an error, and fail memory
+ // requirements.
+ LOG.info("Could not compute memory for taskTracker: "
+ + taskTracker.getHost() + ". Failing memory requirements.");
+ return false;
+ }
long vmemUsedOnTT = memReservedForTasks.vmem;
long pmemUsedOnTT = memReservedForTasks.pmem;
@@ -243,4 +274,4 @@
+ jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
return true;
}
-}
\ No newline at end of file
+}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=773889&r1=773888&r2=773889&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 12 13:35:11 2009
@@ -385,6 +385,10 @@
job.kill();
}
+ public void removeJob(JobID jobid) {
+ jobs.remove(jobid);
+ }
+
@Override
public JobInProgress getJob(JobID jobid) {
return jobs.get(jobid);
@@ -1791,6 +1795,84 @@
assertNull(scheduler.assignTasks(tracker("tt1")));
}
+ /**
+ * Testcase to verify fix for a NPE (HADOOP-5641), when memory based
+ * scheduling is enabled and jobs are retired from memory when tasks
+ * are still active on some Tasktrackers.
+ *
+ * @throws IOException
+ */
+ public void testMemoryMatchingWithRetiredJobs() throws IOException {
+ // create a cluster with a single node.
+ LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
+ taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
+ TaskTrackerStatus.ResourceStatus ttStatus =
+ taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+ LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
+ ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
+ ttStatus.setReservedVirtualMemory(0);
+ ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
+ ttStatus.setReservedPhysicalMemory(0);
+
+ // create scheduler
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+ taskTrackerManager.addQueues(new String[] { "default" });
+ resConf.setFakeQueues(queues);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ LOG.debug("By default, jobs get 0.5 GB per task vmem" +
+ " and 2 GB max vmem, with 50% of it for RAM");
+ scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+ 512 * 1024 * 1024L);
+ scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+ 2 * 1024 * 1024 * 1024L);
+ resConf.setDefaultPercentOfPmemInVmem(50.0f);
+ resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a normal job
+ LOG.debug("Submitting a normal job with 2 maps and 2 reduces");
+ JobConf jConf = new JobConf();
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // 1st cycle - 1 map gets assigned.
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+ // kill this job !
+ taskTrackerManager.killJob(job1.getJobID());
+
+ // retire the job
+ taskTrackerManager.removeJob(job1.getJobID());
+
+ // submit another job.
+ LOG.debug("Submitting another normal job with 1 map and 1 reduce");
+ jConf = new JobConf();
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // 2nd cycle - nothing should get assigned. Memory matching code
+ // will see the job is missing and fail memory requirements.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ // calling again should not make a difference, as the task is still running
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+
+ // finish the task on the tracker.
+ taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+ // now a new task can be assigned.
+ t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // reduce can be assigned.
+ t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ }
+
protected TaskTrackerStatus tracker(String taskTrackerName) {
return taskTrackerManager.getTaskTracker(taskTrackerName);
}