You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jo...@apache.org on 2008/11/24 12:28:49 UTC

svn commit: r720168 - in /hadoop/core/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Author: johan
Date: Mon Nov 24 03:28:48 2008
New Revision: 720168

URL: http://svn.apache.org/viewvc?rev=720168&view=rev
Log:
HADOOP-4666. Launch reduces only after a few maps have run in the Fair Scheduler. (Matei Zaharia via johan)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720168&r1=720167&r2=720168&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Nov 24 03:28:48 2008
@@ -125,6 +125,9 @@
 
     HADOOP-4640. Adds an input format that can split lzo compressed
     text files. (johan)
+    
+    HADOOP-4666. Launch reduces only after a few maps have run in the 
+    Fair Scheduler. (Matei Zaharia via johan)    
 
   OPTIMIZATIONS
 

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=720168&r1=720167&r2=720168&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Mon Nov 24 03:28:48 2008
@@ -48,7 +48,6 @@
       "org.apache.hadoop.mapred.FairScheduler");
   
   protected PoolManager poolMgr;
-  
   protected LoadManager loadMgr;
   protected TaskSelector taskSelector;
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -60,6 +59,7 @@
   protected boolean useFifo;      // Set if we want to revert to FIFO behavior
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+  protected boolean waitForMapsBeforeLaunchingReduces = true;
   private Clock clock;
   private boolean runBackgroundUpdates; // Can be set to false for testing
   private EagerTaskInitializationListener eagerInitListener;
@@ -421,8 +421,12 @@
         }
       }
       info.runningReduces = runningReduces;
-      info.neededReduces = (totalReduces - runningReduces - finishedReduces 
-                            + taskSelector.neededSpeculativeReduces(job));
+      if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
+        info.neededReduces = (totalReduces - runningReduces - finishedReduces 
+            + taskSelector.neededSpeculativeReduces(job));
+      } else {
+        info.neededReduces = 0;
+      }
       // If the job was marked as not runnable due to its user or pool having
       // too many active jobs, set the neededMaps/neededReduces to 0. We still
       // count runningMaps/runningReduces however so we can give it a deficit.
@@ -433,6 +437,18 @@
     }
   }
 
+  /**
+   * Has a job finished enough maps to allow launching its reduces?
+   */
+  protected boolean enoughMapsFinishedToRunReduces(
+      int finishedMaps, int totalMaps) {
+    if (waitForMapsBeforeLaunchingReduces) {
+      return finishedMaps >= Math.max(1, totalMaps * 0.05);
+    } else {
+      return true;
+    }
+  }
+
   private void updateWeights() {
     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
       JobInProgress job = entry.getKey();

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=720168&r1=720167&r2=720168&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Mon Nov 24 03:28:48 2008
@@ -36,9 +36,9 @@
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-  		"build/contrib/streaming/test/data")).getAbsolutePath();
+      "build/contrib/streaming/test/data")).getAbsolutePath();
   final static String ALLOC_FILE = new File(TEST_DIR, 
-  		"test-pools").getAbsolutePath();
+      "test-pools").getAbsolutePath();
   
   private static final String POOL_PROPERTY = "pool";
   
@@ -236,6 +236,7 @@
     taskTrackerManager = new FakeTaskTrackerManager();
     clock = new FakeClock();
     scheduler = new FairScheduler(clock, false);
+    scheduler.waitForMapsBeforeLaunchingReduces = false;
     scheduler.setConf(conf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     scheduler.start();
@@ -1059,6 +1060,30 @@
                scheduler.infos.get(job2).reduceFairShare);
   }
   
+  public void testWaitForMapsBeforeLaunchingReduces() {
+    // We have set waitForMapsBeforeLaunchingReduces to false by default in
+    // this class, so this should return true
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
+    
+    // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
+    // now no longer be able to assign reduces until 5 have finished
+    scheduler.waitForMapsBeforeLaunchingReduces = true;
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
+    
+    // Also test some jobs that have very few maps, in which case we will
+    // wait for at least 1 map to finish
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
+    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
+    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
+  }
+  
   private void advanceTime(long time) {
     clock.advance(time);
     scheduler.update();