You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/02/01 02:39:13 UTC

svn commit: r1065885 - in /hadoop/mapreduce/branches/branch-0.22: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Author: shv
Date: Tue Feb  1 01:39:13 2011
New Revision: 1065885

URL: http://svn.apache.org/viewvc?rev=1065885&view=rev
Log:
MAPREDUCE-2256. Merge -r 1065882:1065883 from trunk to branch 0.22.

Modified:
    hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1065885&r1=1065884&r2=1065885&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Tue Feb  1 01:39:13 2011
@@ -432,11 +432,11 @@ Release 0.22.0 - Unreleased
     MAPREDUCE-2282. Fix TestMRServerPorts for the changes in
     TestHDFSServerPorts.  (shv via szetszwo)
 
-    MAPREDUCE-2238. Fix permissions handling to avoid leaving undeletable directories
-    in local dirs. (todd)
+    MAPREDUCE-2238. Fix permissions handling to avoid leaving undeletable 
+    directories in local dirs. (todd)
 
-    MAPREDUCE-2277. TestCapacitySchedulerWithJobTracker needs to wait for jobs to
-    complete before testing status. (todd)
+    MAPREDUCE-2277. TestCapacitySchedulerWithJobTracker needs to wait for jobs
+    to complete before testing status. (todd)
 
     MAPREDUCE-2253. Servlets should specify content type (todd)
 
@@ -445,6 +445,10 @@ Release 0.22.0 - Unreleased
     MAPREDUCE-1754. Replace mapred.persmissions.supergroup with an 
     acl : mapreduce.cluster.administrators (Amareshwari Sriramadasu via shv)
 
+    MAPREDUCE-2256. FairScheduler fairshare preemption from multiple pools may
+    preempt all tasks from one pool causing that pool to go below fairshare.
+    (Priyo Mustafi via shv)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1065885&r1=1065884&r2=1065885&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Feb  1 01:39:13 2011
@@ -817,7 +817,8 @@ public class FairScheduler extends TaskS
       JobInProgress job = taskTrackerManager.getJob(jobID);
       Pool pool = poolMgr.getPool(job);
       PoolSchedulable sched = pool.getSchedulable(taskType);
-      if (tasksLeft.get(pool) > sched.getFairShare()) {
+      int tasksLeftForPool = tasksLeft.get(pool);
+      if (tasksLeftForPool > sched.getFairShare()) {
         eventLog.log("PREEMPT", status.getTaskID(),
             status.getTaskTracker());
         try {
@@ -825,6 +826,9 @@ public class FairScheduler extends TaskS
           tasksToPreempt--;
           if (tasksToPreempt == 0)
             break;
+          
+          // reduce tasks left for pool
+          tasksLeft.put(pool, --tasksLeftForPool);
         } catch (IOException e) {
           LOG.error("Failed to kill task " + status.getTaskID(), e);
         }

Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1065885&r1=1065884&r2=1065885&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Feb  1 01:39:13 2011
@@ -1957,6 +1957,101 @@ public class TestFairScheduler extends T
   }
   
   /**
+   * This test runs on a 3-node (6-slot) cluster to allow 3 pools with fair
+   * shares equal 2 slots to coexist (which makes the half-fair-share 
+   * of each pool equal to 1 so that fair share preemption can kick in). 
+   * 
+   * The test first starts job 1, which takes 3 map slots and 0 reduce slots,
+   * in pool 1.  We then submit job 2 in pool 2, which takes 3 map slots and zero
+   * reduce slots. Finally, we submit a third job, job 3 in pool3, which gets no slots. 
+   * At this point the fair share of each pool will be 6/3 = 2 slots. 
+   * Pool 1 and 2 will be above their fair share and pool 3 will be below half fair share. 
+   * Therefore pool 3 should preempt tasks from both pool 1 & 2 (after a timeout) but 
+   * pools 1 and 2 shouldn't. 
+   */
+  public void testFairSharePreemptionFromMultiplePools() throws Exception {
+	// Create a bigger cluster than normal (3 tasktrackers instead of 2)
+	setUpCluster(1, 3, false);
+	// Enable preemption in scheduler
+	scheduler.preemptionEnabled = true;
+	// Set up pools file with a fair share preemtion timeout of 1 minute
+	PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+	out.println("<?xml version=\"1.0\"?>");
+	out.println("<allocations>");
+	out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+	out.println("</allocations>");
+	out.close();
+	scheduler.getPoolManager().reloadAllocs();
+	 
+	// Grab pools (they'll be created even though they're not in the alloc file)
+	Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+	Pool pool2 = scheduler.getPoolManager().getPool("pool2");
+	Pool pool3 = scheduler.getPoolManager().getPool("pool3");
+
+	// Submit job 1. We advance time by 100 between each task tracker
+	// assignment stage to ensure that the tasks from job1 on tt3 are the ones
+	// that are deterministically preempted first (being the latest launched
+	// tasks in an over-allocated job).
+	JobInProgress job1 = submitJob(JobStatus.RUNNING, 12, 0, "pool1");
+	advanceTime(100);
+	checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+	checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+	advanceTime(100);
+	checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+	advanceTime(100);
+	    
+	// Submit job 2. It should get the last 3 slots.
+	JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0, "pool2");
+	advanceTime(100);
+	checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+	checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+	advanceTime(100);
+	checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
+
+	advanceTime(100);
+	    
+	// Submit job 3.
+	JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 0, "pool3");
+	    
+	// Check that after 59 seconds, neither pool can preempt
+	advanceTime(59000);
+	assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
+			clock.getTime()));
+	assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
+	        clock.getTime()));
+	assertEquals(0, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
+	        clock.getTime()));
+	assertEquals(0, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
+	        clock.getTime()));
+	    
+	// Wait 2 more seconds, so that job 3 has now been in the system for 61s.
+	// Now pool 3 should be able to preempt 2 tasks (its share of 2 rounded
+	// down to its floor), but pool 1 & 2 shouldn't.
+	advanceTime(2000);
+	assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
+	        clock.getTime()));
+	assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
+	        clock.getTime()));
+	assertEquals(2, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
+	        clock.getTime()));
+	assertEquals(0, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
+	        clock.getTime()));
+	    
+	// Test that the tasks actually get preempted and we can assign new ones.
+	// This should preempt one task each from pool1 and pool2
+	scheduler.preemptTasksIfNecessary();
+	scheduler.update();
+	assertEquals(2, job2.runningMaps());  
+	assertEquals(2, job1.runningMaps());  
+	checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
+	checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+	assertNull(scheduler.assignTasks(tracker("tt1")));
+	assertNull(scheduler.assignTasks(tracker("tt2")));
+	assertNull(scheduler.assignTasks(tracker("tt3")));
+  }
+  
+  
+  /**
    * This test submits a job that takes all 4 slots, and then a second job in
    * a pool that has both a min share of 2 slots with a 60s timeout and a
    * fair share timeout of 60s. After 60 seconds, this pool will be starved