You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2010/07/02 01:04:25 UTC

svn commit: r959806 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Author: matei
Date: Thu Jul  1 23:04:24 2010
New Revision: 959806

URL: http://svn.apache.org/viewvc?rev=959806&view=rev
Log:
MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number. Contributed by Scott Chen.


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=959806&r1=959805&r2=959806&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  1 23:04:24 2010
@@ -82,6 +82,9 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number.
+    (Scott Chen via matei)
+
     MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.
     (Vinod Kumar Vavilapalli)
     

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=959806&r1=959805&r2=959806&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Jul  1 23:04:24 2010
@@ -850,11 +850,11 @@ public class FairScheduler extends TaskS
     int tasksDueToFairShare = 0;
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
       int target = Math.min(sched.getMinShare(), sched.getDemand());
-      tasksDueToMinShare = target - sched.getRunningTasks();
+      tasksDueToMinShare = Math.max(0, target - sched.getRunningTasks());
     }
     if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
       int target = (int) Math.min(sched.getFairShare(), sched.getDemand());
-      tasksDueToFairShare = target - sched.getRunningTasks();
+      tasksDueToFairShare = Math.max(0, target - sched.getRunningTasks());
     }
     int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
     if (tasksToPreempt > 0) {

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=959806&r1=959805&r2=959806&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Jul  1 23:04:24 2010
@@ -2621,4 +2621,81 @@ public class TestFairScheduler extends T
     for (int i = 0; i < tasks.size(); i++)
       assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());
   }
+
+  /**
+   * This test submits a job that takes all 2 slots in a pool has both a min
+   * share of 2 slots with minshare timeout of 5s, and then a second job in
+   * default pool with a fair share timeout of 5s. After 60 seconds, this pool
+   * will be starved of fair share (2 slots of each type), and we test that it
+   * does not kill more than 2 tasks of each type.
+   */
+  public void testFairSharePreemptionWithShortTimeout() throws Exception {
+    // Enable preemption in scheduler
+    scheduler.preemptionEnabled = true;
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<fairSharePreemptionTimeout>5</fairSharePreemptionTimeout>");
+    out.println("<pool name=\"pool1\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+
+    // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+    // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1");
+    JobInfo info1 = scheduler.infos.get(job1);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+
+    advanceTime(10000);
+    assertEquals(4,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(4,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
+    // Ten seconds later, submit job 2.
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default");
+
+    // Advance time by 6 seconds without update the scheduler.
+    // This simulates the time gap between update and task preemption.
+    clock.advance(6000);
+    assertEquals(4,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(4,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(),
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(),
+        clock.getTime()));
+    assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(),
+        clock.getTime()));
+    assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(),
+        clock.getTime()));
+
+    // Test that the tasks actually get preempted and we can assign new ones
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(2, job1.runningMaps());
+    assertEquals(2, job1.runningReduces());
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
 }