You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2010/07/02 01:04:25 UTC
svn commit: r959806 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Author: matei
Date: Thu Jul 1 23:04:24 2010
New Revision: 959806
URL: http://svn.apache.org/viewvc?rev=959806&view=rev
Log:
MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number. Contributed by Scott Chen.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=959806&r1=959805&r2=959806&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 1 23:04:24 2010
@@ -82,6 +82,9 @@ Trunk (unreleased changes)
BUG FIXES
+ MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number.
+ (Scott Chen via matei)
+
MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.
(Vinod Kumar Vavilapalli)
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=959806&r1=959805&r2=959806&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Jul 1 23:04:24 2010
@@ -850,11 +850,11 @@ public class FairScheduler extends TaskS
int tasksDueToFairShare = 0;
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
int target = Math.min(sched.getMinShare(), sched.getDemand());
- tasksDueToMinShare = target - sched.getRunningTasks();
+ tasksDueToMinShare = Math.max(0, target - sched.getRunningTasks());
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
int target = (int) Math.min(sched.getFairShare(), sched.getDemand());
- tasksDueToFairShare = target - sched.getRunningTasks();
+ tasksDueToFairShare = Math.max(0, target - sched.getRunningTasks());
}
int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
if (tasksToPreempt > 0) {
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=959806&r1=959805&r2=959806&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Jul 1 23:04:24 2010
@@ -2621,4 +2621,81 @@ public class TestFairScheduler extends T
for (int i = 0; i < tasks.size(); i++)
assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());
}
+
+ /**
+ * This test submits a job that takes all 2 slots in a pool has both a min
+ * share of 2 slots with minshare timeout of 5s, and then a second job in
+ * default pool with a fair share timeout of 5s. After 60 seconds, this pool
+ * will be starved of fair share (2 slots of each type), and we test that it
+ * does not kill more than 2 tasks of each type.
+ */
+ public void testFairSharePreemptionWithShortTimeout() throws Exception {
+ // Enable preemption in scheduler
+ scheduler.preemptionEnabled = true;
+ // Set up pools file
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<fairSharePreemptionTimeout>5</fairSharePreemptionTimeout>");
+ out.println("<pool name=\"pool1\">");
+ out.println("<minMaps>2</minMaps>");
+ out.println("<minReduces>2</minReduces>");
+ out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+ Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+ Pool defaultPool = scheduler.getPoolManager().getPool("default");
+
+ // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1");
+ JobInfo info1 = scheduler.infos.get(job1);
+ checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ advanceTime(100);
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+
+ advanceTime(10000);
+ assertEquals(4, info1.mapSchedulable.getRunningTasks());
+ assertEquals(4, info1.reduceSchedulable.getRunningTasks());
+ assertEquals(4.0, info1.mapSchedulable.getFairShare());
+ assertEquals(4.0, info1.reduceSchedulable.getFairShare());
+ // Ten seconds later, submit job 2.
+ JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default");
+
+ // Advance time by 6 seconds without update the scheduler.
+ // This simulates the time gap between update and task preemption.
+ clock.advance(6000);
+ assertEquals(4, info1.mapSchedulable.getRunningTasks());
+ assertEquals(4, info1.reduceSchedulable.getRunningTasks());
+ assertEquals(2.0, info1.mapSchedulable.getFairShare());
+ assertEquals(2.0, info1.reduceSchedulable.getFairShare());
+ assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(),
+ clock.getTime()));
+ assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(),
+ clock.getTime()));
+
+ // Test that the tasks actually get preempted and we can assign new ones
+ scheduler.preemptTasksIfNecessary();
+ scheduler.update();
+ assertEquals(2, job1.runningMaps());
+ assertEquals(2, job1.runningReduces());
+ checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ }
}