You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/02/01 02:39:13 UTC
svn commit: r1065885 - in /hadoop/mapreduce/branches/branch-0.22: CHANGES.txt
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Author: shv
Date: Tue Feb 1 01:39:13 2011
New Revision: 1065885
URL: http://svn.apache.org/viewvc?rev=1065885&view=rev
Log:
MAPREDUCE-2256. Merge -r 1065882:1065883 from trunk to branch 0.22.
Modified:
hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1065885&r1=1065884&r2=1065885&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Tue Feb 1 01:39:13 2011
@@ -432,11 +432,11 @@ Release 0.22.0 - Unreleased
MAPREDUCE-2282. Fix TestMRServerPorts for the changes in
TestHDFSServerPorts. (shv via szetszwo)
- MAPREDUCE-2238. Fix permissions handling to avoid leaving undeletable directories
- in local dirs. (todd)
+ MAPREDUCE-2238. Fix permissions handling to avoid leaving undeletable
+ directories in local dirs. (todd)
- MAPREDUCE-2277. TestCapacitySchedulerWithJobTracker needs to wait for jobs to
- complete before testing status. (todd)
+ MAPREDUCE-2277. TestCapacitySchedulerWithJobTracker needs to wait for jobs
+ to complete before testing status. (todd)
MAPREDUCE-2253. Servlets should specify content type (todd)
@@ -445,6 +445,10 @@ Release 0.22.0 - Unreleased
MAPREDUCE-1754. Replace mapred.persmissions.supergroup with an
acl : mapreduce.cluster.administrators (Amareshwari Sriramadasu via shv)
+ MAPREDUCE-2256. FairScheduler fairshare preemption from multiple pools may
+ preempt all tasks from one pool causing that pool to go below fairshare.
+ (Priyo Mustafi via shv)
+
Release 0.21.1 - Unreleased
NEW FEATURES
Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1065885&r1=1065884&r2=1065885&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Tue Feb 1 01:39:13 2011
@@ -817,7 +817,8 @@ public class FairScheduler extends TaskS
JobInProgress job = taskTrackerManager.getJob(jobID);
Pool pool = poolMgr.getPool(job);
PoolSchedulable sched = pool.getSchedulable(taskType);
- if (tasksLeft.get(pool) > sched.getFairShare()) {
+ int tasksLeftForPool = tasksLeft.get(pool);
+ if (tasksLeftForPool > sched.getFairShare()) {
eventLog.log("PREEMPT", status.getTaskID(),
status.getTaskTracker());
try {
@@ -825,6 +826,9 @@ public class FairScheduler extends TaskS
tasksToPreempt--;
if (tasksToPreempt == 0)
break;
+
+ // reduce tasks left for pool
+ tasksLeft.put(pool, --tasksLeftForPool);
} catch (IOException e) {
LOG.error("Failed to kill task " + status.getTaskID(), e);
}
Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1065885&r1=1065884&r2=1065885&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Tue Feb 1 01:39:13 2011
@@ -1957,6 +1957,101 @@ public class TestFairScheduler extends T
}
/**
+ * This test runs on a 3-node (6-slot) cluster to allow 3 pools with fair
+ * shares equal 2 slots to coexist (which makes the half-fair-share
+ * of each pool equal to 1 so that fair share preemption can kick in).
+ *
+ * The test first starts job 1, which takes 3 map slots and 0 reduce slots,
+ * in pool 1. We then submit job 2 in pool 2, which takes 3 map slots and zero
+ * reduce slots. Finally, we submit a third job, job 3 in pool3, which gets no slots.
+ * At this point the fair share of each pool will be 6/3 = 2 slots.
+ * Pool 1 and 2 will be above their fair share and pool 3 will be below half fair share.
+ * Therefore pool 3 should preempt tasks from both pool 1 & 2 (after a timeout) but
+ * pools 1 and 2 shouldn't.
+ */
+ public void testFairSharePreemptionFromMultiplePools() throws Exception {
+ // Create a bigger cluster than normal (3 tasktrackers instead of 2)
+ setUpCluster(1, 3, false);
+ // Enable preemption in scheduler
+ scheduler.preemptionEnabled = true;
+ // Set up pools file with a fair share preemtion timeout of 1 minute
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+
+ // Grab pools (they'll be created even though they're not in the alloc file)
+ Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+ Pool pool2 = scheduler.getPoolManager().getPool("pool2");
+ Pool pool3 = scheduler.getPoolManager().getPool("pool3");
+
+ // Submit job 1. We advance time by 100 between each task tracker
+ // assignment stage to ensure that the tasks from job1 on tt3 are the ones
+ // that are deterministically preempted first (being the latest launched
+ // tasks in an over-allocated job).
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 12, 0, "pool1");
+ advanceTime(100);
+ checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ advanceTime(100);
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ advanceTime(100);
+
+ // Submit job 2. It should get the last 3 slots.
+ JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0, "pool2");
+ advanceTime(100);
+ checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+ checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+ advanceTime(100);
+ checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
+
+ advanceTime(100);
+
+ // Submit job 3.
+ JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 0, "pool3");
+
+ // Check that after 59 seconds, neither pool can preempt
+ advanceTime(59000);
+ assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
+ clock.getTime()));
+
+ // Wait 2 more seconds, so that job 3 has now been in the system for 61s.
+ // Now pool 3 should be able to preempt 2 tasks (its share of 2 rounded
+ // down to its floor), but pool 1 & 2 shouldn't.
+ advanceTime(2000);
+ assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
+ clock.getTime()));
+ assertEquals(2, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
+ clock.getTime()));
+ assertEquals(0, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
+ clock.getTime()));
+
+ // Test that the tasks actually get preempted and we can assign new ones.
+ // This should preempt one task each from pool1 and pool2
+ scheduler.preemptTasksIfNecessary();
+ scheduler.update();
+ assertEquals(2, job2.runningMaps());
+ assertEquals(2, job1.runningMaps());
+ checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
+ checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ assertNull(scheduler.assignTasks(tracker("tt3")));
+ }
+
+
+ /**
* This test submits a job that takes all 4 slots, and then a second job in
* a pool that has both a min share of 2 slots with a 60s timeout and a
* fair share timeout of 60s. After 60 seconds, this pool will be starved