You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jo...@apache.org on 2008/11/24 12:28:49 UTC
svn commit: r720168 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Author: johan
Date: Mon Nov 24 03:28:48 2008
New Revision: 720168
URL: http://svn.apache.org/viewvc?rev=720168&view=rev
Log:
HADOOP-4666. Launch reduces only after a few maps have run in the Fair Scheduler. (Matei Zaharia via johan)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720168&r1=720167&r2=720168&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Nov 24 03:28:48 2008
@@ -125,6 +125,9 @@
HADOOP-4640. Adds an input format that can split lzo compressed
text files. (johan)
+
+ HADOOP-4666. Launch reduces only after a few maps have run in the
+ Fair Scheduler. (Matei Zaharia via johan)
OPTIMIZATIONS
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=720168&r1=720167&r2=720168&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Mon Nov 24 03:28:48 2008
@@ -48,7 +48,6 @@
"org.apache.hadoop.mapred.FairScheduler");
protected PoolManager poolMgr;
-
protected LoadManager loadMgr;
protected TaskSelector taskSelector;
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -60,6 +59,7 @@
protected boolean useFifo; // Set if we want to revert to FIFO behavior
protected boolean assignMultiple; // Simultaneously assign map and reduce?
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+ protected boolean waitForMapsBeforeLaunchingReduces = true;
private Clock clock;
private boolean runBackgroundUpdates; // Can be set to false for testing
private EagerTaskInitializationListener eagerInitListener;
@@ -421,8 +421,12 @@
}
}
info.runningReduces = runningReduces;
- info.neededReduces = (totalReduces - runningReduces - finishedReduces
- + taskSelector.neededSpeculativeReduces(job));
+ if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
+ info.neededReduces = (totalReduces - runningReduces - finishedReduces
+ + taskSelector.neededSpeculativeReduces(job));
+ } else {
+ info.neededReduces = 0;
+ }
// If the job was marked as not runnable due to its user or pool having
// too many active jobs, set the neededMaps/neededReduces to 0. We still
// count runningMaps/runningReduces however so we can give it a deficit.
@@ -433,6 +437,18 @@
}
}
+ /**
+ * Has a job finished enough maps to allow launching its reduces?
+ */
+ protected boolean enoughMapsFinishedToRunReduces(
+ int finishedMaps, int totalMaps) {
+ if (waitForMapsBeforeLaunchingReduces) {
+ return finishedMaps >= Math.max(1, totalMaps * 0.05);
+ } else {
+ return true;
+ }
+ }
+
private void updateWeights() {
for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
JobInProgress job = entry.getKey();
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=720168&r1=720167&r2=720168&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Mon Nov 24 03:28:48 2008
@@ -36,9 +36,9 @@
public class TestFairScheduler extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "build/contrib/streaming/test/data")).getAbsolutePath();
+ "build/contrib/streaming/test/data")).getAbsolutePath();
final static String ALLOC_FILE = new File(TEST_DIR,
- "test-pools").getAbsolutePath();
+ "test-pools").getAbsolutePath();
private static final String POOL_PROPERTY = "pool";
@@ -236,6 +236,7 @@
taskTrackerManager = new FakeTaskTrackerManager();
clock = new FakeClock();
scheduler = new FairScheduler(clock, false);
+ scheduler.waitForMapsBeforeLaunchingReduces = false;
scheduler.setConf(conf);
scheduler.setTaskTrackerManager(taskTrackerManager);
scheduler.start();
@@ -1059,6 +1060,30 @@
scheduler.infos.get(job2).reduceFairShare);
}
+ public void testWaitForMapsBeforeLaunchingReduces() {
+ // We have set waitForMapsBeforeLaunchingReduces to false by default in
+ // this class, so this should return true
+ assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
+
+ // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
+ // now no longer be able to assign reduces until 5 have finished
+ scheduler.waitForMapsBeforeLaunchingReduces = true;
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
+ assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
+ assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
+
+ // Also test some jobs that have very few maps, in which case we will
+ // wait for at least 1 map to finish
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
+ assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
+ assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
+ assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
+ }
+
private void advanceTime(long time) {
clock.advance(time);
scheduler.update();